From 9796d158bf735251c181d6b74e5d15a200b3c559 Mon Sep 17 00:00:00 2001 From: John Gozde Date: Sun, 14 Oct 2012 11:21:38 -0600 Subject: [PATCH 1/3] Avoid closure on RetryIfInterrupted calls. --- src/ZeroMQ/Interop/MethodHelper.cs | 33 ++++++++++++++++++++ src/ZeroMQ/Interop/SocketProxy.cs | 48 ++++++++++-------------------- src/ZeroMQ/ZeroMQ.csproj | 1 + 3 files changed, 50 insertions(+), 32 deletions(-) create mode 100644 src/ZeroMQ/Interop/MethodHelper.cs diff --git a/src/ZeroMQ/Interop/MethodHelper.cs b/src/ZeroMQ/Interop/MethodHelper.cs new file mode 100644 index 0000000..81d3982 --- /dev/null +++ b/src/ZeroMQ/Interop/MethodHelper.cs @@ -0,0 +1,33 @@ +namespace ZeroMQ.Interop +{ + using System; + + internal static class MethodHelper + { + public static int RetryIfInterrupted(Func operation, T1 arg1, T2 arg2, T3 arg3) + { + int rc; + + do + { + rc = operation(arg1, arg2, arg3); + } + while (rc == -1 && LibZmq.zmq_errno() == ErrorCode.EINTR); + + return rc; + } + + public static int RetryIfInterrupted(Func operation, T1 arg1, T2 arg2, T3 arg3, T4 arg4) + { + int rc; + + do + { + rc = operation(arg1, arg2, arg3, arg4); + } + while (rc == -1 && LibZmq.zmq_errno() == ErrorCode.EINTR); + + return rc; + } + } +} diff --git a/src/ZeroMQ/Interop/SocketProxy.cs b/src/ZeroMQ/Interop/SocketProxy.cs index 22723e3..00086fc 100644 --- a/src/ZeroMQ/Interop/SocketProxy.cs +++ b/src/ZeroMQ/Interop/SocketProxy.cs @@ -3,7 +3,6 @@ using System; using System.Runtime.InteropServices; - // ReSharper disable AccessToDisposedClosure internal class SocketProxy : IDisposable { public const int MaxBufferSize = 8192; @@ -85,7 +84,7 @@ public int Receive(byte[] buffer, int flags) // Use zmq_buffer_recv method if appropriate -> results in fewer P/Invoke calls if (buffer.Length <= MaxBufferSize && LibZmq.zmq_buffer_recv != null) { - int bytesReceived = RetryIfInterrupted(() => LibZmq.zmq_buffer_recv(SocketHandle, _receiveIntPtrBuffer, MaxBufferSize, flags)); + int bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_buffer_recv.Invoke, SocketHandle, _receiveIntPtrBuffer, MaxBufferSize, flags); int size = Math.Min(buffer.Length, bytesReceived); Marshal.Copy(_receiveIntPtrBuffer, buffer, 0, size); @@ -100,7 +99,7 @@ public int Receive(byte[] buffer, int flags) return -1; } - int bytesReceived = RetryIfInterrupted(() => LibZmq.zmq_msg_recv(message, SocketHandle, flags)); + int bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_recv.Invoke, message.Ptr, SocketHandle, flags); if (bytesReceived == 0 && LibZmq.MajorVersion < 3) { @@ -133,7 +132,7 @@ public byte[] Receive(byte[] buffer, int flags, out int size) return buffer; } - int bytesReceived = RetryIfInterrupted(() => LibZmq.zmq_msg_recv(message, SocketHandle, flags)); + int bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_recv.Invoke, message.Ptr, SocketHandle, flags); if (bytesReceived >= 0) { @@ -170,7 +169,7 @@ public int Send(byte[] buffer, int size, int flags) int sizeToSend = Math.Min(size, MaxBufferSize); Marshal.Copy(buffer, 0, _sendIntPtrBuffer, sizeToSend); - return RetryIfInterrupted(() => LibZmq.zmq_buffer_send(SocketHandle, _sendIntPtrBuffer, sizeToSend, flags)); + return MethodHelper.RetryIfInterrupted(LibZmq.zmq_buffer_send.Invoke, SocketHandle, _sendIntPtrBuffer, sizeToSend, flags); } using (var message = new ZmqMsgT()) @@ -185,7 +184,7 @@ public int Send(byte[] buffer, int size, int flags) Marshal.Copy(buffer, 0, message.Data(), size); } - int bytesSent = RetryIfInterrupted(() => LibZmq.zmq_msg_send(message, SocketHandle, flags)); + int bytesSent = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_send.Invoke, message.Ptr, SocketHandle, flags); if (bytesSent == 0 && LibZmq.MajorVersion < 3) { @@ -249,7 +248,7 @@ public int GetSocketOption(int option, out int value) { Marshal.WriteInt32(optionLength, sizeof(int)); - int rc = RetryIfInterrupted(() => LibZmq.zmq_getsockopt(SocketHandle, option, optionValue, optionLength)); + int rc = MethodHelper.RetryIfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); value = Marshal.ReadInt32(optionValue); return rc; @@ -263,7 +262,7 @@ public int GetSocketOption(int option, out long value) { Marshal.WriteInt32(optionLength, sizeof(long)); - int rc = RetryIfInterrupted(() => LibZmq.zmq_getsockopt(SocketHandle, option, optionValue, optionLength)); + int rc = MethodHelper.RetryIfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); value = Marshal.ReadInt64(optionValue); return rc; @@ -277,7 +276,7 @@ public int GetSocketOption(int option, out ulong value) { Marshal.WriteInt32(optionLength, sizeof(ulong)); - int rc = RetryIfInterrupted(() => LibZmq.zmq_getsockopt(SocketHandle, option, optionValue, optionLength)); + int rc = MethodHelper.RetryIfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); value = unchecked(Convert.ToUInt64(Marshal.ReadInt64(optionValue))); return rc; @@ -291,7 +290,7 @@ public int GetSocketOption(int option, out byte[] value) { Marshal.WriteInt32(optionLength, MaxBinaryOptionSize); - int rc = RetryIfInterrupted(() => LibZmq.zmq_getsockopt(SocketHandle, option, optionValue, optionLength)); + int rc = MethodHelper.RetryIfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); value = new byte[Marshal.ReadInt32(optionLength)]; Marshal.Copy(optionValue, value, 0, value.Length); @@ -307,7 +306,7 @@ public int GetSocketOption(int option, out string value) { Marshal.WriteInt32(optionLength, MaxBinaryOptionSize); - int rc = RetryIfInterrupted(() => LibZmq.zmq_getsockopt(SocketHandle, option, optionValue, optionLength)); + int rc = MethodHelper.RetryIfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); value = rc == 0 ? Marshal.PtrToStringAnsi(optionValue) : string.Empty; @@ -319,7 +318,7 @@ public int SetSocketOption(int option, string value) { if (value == null) { - return RetryIfInterrupted(() => LibZmq.zmq_setsockopt(SocketHandle, option, IntPtr.Zero, 0)); + return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, IntPtr.Zero, 0); } var encoded = System.Text.Encoding.ASCII.GetBytes(value + "\x0"); @@ -327,7 +326,7 @@ public int SetSocketOption(int option, string value) { Marshal.Copy(encoded, 0, optionValue, encoded.Length); - return RetryIfInterrupted(() => LibZmq.zmq_setsockopt(SocketHandle, option, optionValue, value.Length)); + return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, value.Length); } } @@ -337,7 +336,7 @@ public int SetSocketOption(int option, int value) { Marshal.WriteInt32(optionValue, value); - return RetryIfInterrupted(() => LibZmq.zmq_setsockopt(SocketHandle, option, optionValue, sizeof(int))); + return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, sizeof(int)); } } @@ -347,7 +346,7 @@ public int SetSocketOption(int option, long value) { Marshal.WriteInt64(optionValue, value); - return RetryIfInterrupted(() => LibZmq.zmq_setsockopt(SocketHandle, option, optionValue, sizeof(long))); + return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, sizeof(long)); } } @@ -357,7 +356,7 @@ public int SetSocketOption(int option, ulong value) { Marshal.WriteInt64(optionValue, unchecked(Convert.ToInt64(value))); - return RetryIfInterrupted(() => LibZmq.zmq_setsockopt(SocketHandle, option, optionValue, sizeof(ulong))); + return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, sizeof(ulong)); } } @@ -367,7 +366,7 @@ public int SetSocketOption(int option, byte[] value) { Marshal.Copy(value, 0, optionValue, value.Length); - return RetryIfInterrupted(() => LibZmq.zmq_setsockopt(SocketHandle, option, optionValue, value.Length)); + return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, value.Length); } } @@ -390,19 +389,6 @@ protected virtual void Dispose(bool disposing) _disposed = true; } - private static int RetryIfInterrupted(Func func) - { - int rc; - - do - { - rc = func(); - } - while (rc == -1 && LibZmq.zmq_errno() == ErrorCode.EINTR); - - return rc; - } - private int GetReceiveMore(out int receiveMore) { if (LibZmq.MajorVersion >= 3) @@ -417,6 +403,4 @@ private int GetReceiveMore(out int receiveMore) return rc; } } - - // ReSharper restore AccessToDisposedClosure } diff --git a/src/ZeroMQ/ZeroMQ.csproj b/src/ZeroMQ/ZeroMQ.csproj index fdce615..df30a9a 100644 --- a/src/ZeroMQ/ZeroMQ.csproj +++ b/src/ZeroMQ/ZeroMQ.csproj @@ -56,6 +56,7 @@ + From 18e941728d5d7d03181b94c994329830960457f9 Mon Sep 17 00:00:00 2001 From: John Gozde Date: Sun, 14 Oct 2012 19:55:56 -0600 Subject: [PATCH 2/3] Reduce memory usage and GC pressure in interop. - Refactor DisposableIntPtr as a struct to keep instances off the heap. - Refactor ZmqMsgT and its usage: - Class now stores a DisposableIntPtr instead of inheriting from it - Only a single instance of ZmqMsgT is ever instantiated and used per ZmqSocket, greatly reducing memory allocations over the lifetime of a socket. This is allowable because Sockets are not thread-safe. - Note that the ZmqMsgT instance is still initialized and closed on each send/receive operation, as required by ZMQ. All that has been reduced are memory allocations for zmq_msg_t structures. --- src/ZeroMQ/Interop/DisposableIntPtr.cs | 17 +-- src/ZeroMQ/Interop/SocketProxy.cs | 189 ++++++++++++------------- src/ZeroMQ/Interop/ZmqMsgT.cs | 37 +++-- 3 files changed, 120 insertions(+), 123 deletions(-) diff --git a/src/ZeroMQ/Interop/DisposableIntPtr.cs b/src/ZeroMQ/Interop/DisposableIntPtr.cs index 706c4f0..1c69f8a 100644 --- a/src/ZeroMQ/Interop/DisposableIntPtr.cs +++ b/src/ZeroMQ/Interop/DisposableIntPtr.cs @@ -3,32 +3,21 @@ using System; using System.Runtime.InteropServices; - internal class DisposableIntPtr : IDisposable + internal struct DisposableIntPtr : IDisposable { + public IntPtr Ptr; + public DisposableIntPtr(int size) { Ptr = Marshal.AllocHGlobal(size); } - ~DisposableIntPtr() - { - Dispose(false); - } - - public IntPtr Ptr { get; private set; } - public static implicit operator IntPtr(DisposableIntPtr disposablePtr) { return disposablePtr.Ptr; } public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) { if (Ptr != IntPtr.Zero) { diff --git a/src/ZeroMQ/Interop/SocketProxy.cs b/src/ZeroMQ/Interop/SocketProxy.cs index 00086fc..7f78bb2 100644 --- a/src/ZeroMQ/Interop/SocketProxy.cs +++ b/src/ZeroMQ/Interop/SocketProxy.cs @@ -10,8 +10,8 @@ internal class SocketProxy : IDisposable // From options.hpp: unsigned char identity [256]; private const int MaxBinaryOptionSize = 255; - private readonly IntPtr _receiveIntPtrBuffer; - private readonly IntPtr _sendIntPtrBuffer; + private readonly ZmqMsgT _msg; + private readonly IntPtr _buffer; private readonly Action _socketClosed; @@ -32,8 +32,8 @@ public SocketProxy(IntPtr socketHandle, Action socketClosed) SocketHandle = socketHandle; _socketClosed = socketClosed; - _receiveIntPtrBuffer = Marshal.AllocHGlobal(MaxBufferSize); - _sendIntPtrBuffer = Marshal.AllocHGlobal(MaxBufferSize); + _msg = new ZmqMsgT(); + _buffer = Marshal.AllocHGlobal(MaxBufferSize); } ~SocketProxy() @@ -81,84 +81,80 @@ public int Close() public int Receive(byte[] buffer, int flags) { + int bytesReceived; + // Use zmq_buffer_recv method if appropriate -> results in fewer P/Invoke calls if (buffer.Length <= MaxBufferSize && LibZmq.zmq_buffer_recv != null) { - int bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_buffer_recv.Invoke, SocketHandle, _receiveIntPtrBuffer, MaxBufferSize, flags); + bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_buffer_recv.Invoke, SocketHandle, _buffer, MaxBufferSize, flags); int size = Math.Min(buffer.Length, bytesReceived); - Marshal.Copy(_receiveIntPtrBuffer, buffer, 0, size); + Marshal.Copy(_buffer, buffer, 0, size); return size; } - using (var message = new ZmqMsgT()) + if (_msg.Init(buffer.Length) == -1) { - if (message.Init(buffer.Length) == -1) - { - return -1; - } + return -1; + } - int bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_recv.Invoke, message.Ptr, SocketHandle, flags); + bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_recv.Invoke, _msg.Ptr, SocketHandle, flags); - if (bytesReceived == 0 && LibZmq.MajorVersion < 3) - { - // 0MQ 2.x does not return number of bytes received - bytesReceived = message.Size(); - } - - if (bytesReceived > 0) - { - Marshal.Copy(message.Data(), buffer, 0, bytesReceived); - } + if (bytesReceived == 0 && LibZmq.MajorVersion < 3) + { + // 0MQ 2.x does not return number of bytes received + bytesReceived = _msg.Size(); + } - if (message.Close() == -1) - { - return -1; - } + if (bytesReceived > 0) + { + Marshal.Copy(_msg.Data(), buffer, 0, bytesReceived); + } - return bytesReceived; + if (_msg.Close() == -1) + { + return -1; } + + return bytesReceived; } public byte[] Receive(byte[] buffer, int flags, out int size) { size = -1; - using (var message = new ZmqMsgT()) + if (_msg.Init() == -1) { - if (message.Init() == -1) - { - return buffer; - } + return buffer; + } - int bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_recv.Invoke, message.Ptr, SocketHandle, flags); + int bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_recv.Invoke, _msg.Ptr, SocketHandle, flags); - if (bytesReceived >= 0) + if (bytesReceived >= 0) + { + if (bytesReceived == 0 && LibZmq.MajorVersion < 3) { - if (bytesReceived == 0 && LibZmq.MajorVersion < 3) - { - // 0MQ 2.x does not return number of bytes received - bytesReceived = message.Size(); - } - - size = bytesReceived; - - if (buffer == null || size > buffer.Length) - { - buffer = new byte[size]; - } - - Marshal.Copy(message.Data(), buffer, 0, size); + // 0MQ 2.x does not return number of bytes received + bytesReceived = _msg.Size(); } - if (message.Close() == -1) + size = bytesReceived; + + if (buffer == null || size > buffer.Length) { - size = -1; + buffer = new byte[size]; } - return buffer; + Marshal.Copy(_msg.Data(), buffer, 0, size); } + + if (_msg.Close() == -1) + { + size = -1; + } + + return buffer; } public int Send(byte[] buffer, int size, int flags) @@ -167,78 +163,72 @@ public int Send(byte[] buffer, int size, int flags) if (buffer.Length <= MaxBufferSize && LibZmq.zmq_buffer_send != null) { int sizeToSend = Math.Min(size, MaxBufferSize); - Marshal.Copy(buffer, 0, _sendIntPtrBuffer, sizeToSend); + Marshal.Copy(buffer, 0, _buffer, sizeToSend); - return MethodHelper.RetryIfInterrupted(LibZmq.zmq_buffer_send.Invoke, SocketHandle, _sendIntPtrBuffer, sizeToSend, flags); + return MethodHelper.RetryIfInterrupted(LibZmq.zmq_buffer_send.Invoke, SocketHandle, _buffer, sizeToSend, flags); } - using (var message = new ZmqMsgT()) + if (_msg.Init(size) == -1) { - if (message.Init(size) == -1) - { - return -1; - } - - if (size > 0) - { - Marshal.Copy(buffer, 0, message.Data(), size); - } + return -1; + } - int bytesSent = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_send.Invoke, message.Ptr, SocketHandle, flags); + if (size > 0) + { + Marshal.Copy(buffer, 0, _msg.Data(), size); + } - if (bytesSent == 0 && LibZmq.MajorVersion < 3) - { - // 0MQ 2.x does not report number of bytes sent, so this may be inaccurate/misleading - bytesSent = size; - } + int bytesSent = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_send.Invoke, _msg.Ptr, SocketHandle, flags); - if (message.Close() == -1) - { - return -1; - } + if (bytesSent == 0 && LibZmq.MajorVersion < 3) + { + // 0MQ 2.x does not report number of bytes sent, so this may be inaccurate/misleading + bytesSent = size; + } - return bytesSent; + if (_msg.Close() == -1) + { + return -1; } + + return bytesSent; } public int Forward(IntPtr destinationHandle) { - using (var message = new ZmqMsgT()) + if (_msg.Init() == -1) + { + return -1; + } + + int receiveMore; + int bytesSent; + + do { - if (message.Init() == -1) + if (LibZmq.zmq_msg_recv(_msg, SocketHandle, 0) == -1) { return -1; } - int receiveMore; - int bytesSent; - - do + if (GetReceiveMore(out receiveMore) == -1) { - if (LibZmq.zmq_msg_recv(message, SocketHandle, 0) == -1) - { - return -1; - } - - if (GetReceiveMore(out receiveMore) == -1) - { - return -1; - } - - if ((bytesSent = LibZmq.zmq_msg_send(message, destinationHandle, receiveMore == 1 ? (int)SocketFlags.SendMore : 0)) == -1) - { - return -1; - } + return -1; } - while (receiveMore == 1); - if (message.Close() == -1) + if ((bytesSent = LibZmq.zmq_msg_send(_msg, destinationHandle, receiveMore == 1 ? (int)SocketFlags.SendMore : 0)) == -1) { return -1; } + } + while (receiveMore == 1); - return bytesSent; + if (_msg.Close() == -1) + { + return -1; } + + return bytesSent; } public int GetSocketOption(int option, out int value) @@ -378,10 +368,11 @@ public virtual void Dispose() protected virtual void Dispose(bool disposing) { + _msg.Dispose(disposing); + if (!_disposed && disposing) { - Marshal.FreeHGlobal(_receiveIntPtrBuffer); - Marshal.FreeHGlobal(_sendIntPtrBuffer); + Marshal.FreeHGlobal(_buffer); Close(); } diff --git a/src/ZeroMQ/Interop/ZmqMsgT.cs b/src/ZeroMQ/Interop/ZmqMsgT.cs index 0f8db38..663de1c 100644 --- a/src/ZeroMQ/Interop/ZmqMsgT.cs +++ b/src/ZeroMQ/Interop/ZmqMsgT.cs @@ -2,23 +2,34 @@ { using System; - internal class ZmqMsgT : DisposableIntPtr + internal sealed class ZmqMsgT : IDisposable { + private DisposableIntPtr _ptr; private bool _initialized; public ZmqMsgT() - : base(LibZmq.ZmqMsgTSize) { + _ptr = new DisposableIntPtr(LibZmq.ZmqMsgTSize); + } + + ~ZmqMsgT() + { + Dispose(false); + } + + public IntPtr Ptr + { + get { return _ptr.Ptr; } } public static implicit operator IntPtr(ZmqMsgT msg) { - return msg.Ptr; + return msg._ptr; } public int Init() { - int rc = LibZmq.zmq_msg_init(Ptr); + int rc = LibZmq.zmq_msg_init(_ptr); _initialized = true; @@ -27,7 +38,7 @@ public int Init() public int Init(int size) { - int rc = LibZmq.zmq_msg_init_size(Ptr, size); + int rc = LibZmq.zmq_msg_init_size(_ptr, size); _initialized = true; @@ -36,7 +47,7 @@ public int Init(int size) public int Close() { - int rc = LibZmq.zmq_msg_close(Ptr); + int rc = LibZmq.zmq_msg_close(_ptr); _initialized = false; @@ -45,22 +56,28 @@ public int Close() public int Size() { - return LibZmq.zmq_msg_size(Ptr); + return LibZmq.zmq_msg_size(_ptr); } public IntPtr Data() { - return LibZmq.zmq_msg_data(Ptr); + return LibZmq.zmq_msg_data(_ptr); + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); } - protected override void Dispose(bool disposing) + internal void Dispose(bool disposing) { if (disposing && _initialized) { Close(); } - base.Dispose(disposing); + _ptr.Dispose(); } } } From 76397c9f3aafa689c038e0a74ae204d28e92cbde Mon Sep 17 00:00:00 2001 From: John Gozde Date: Sun, 14 Oct 2012 20:07:56 -0600 Subject: [PATCH 3/3] Rename MethodHelper to Retry. --- .../Interop/{MethodHelper.cs => Retry.cs} | 6 ++-- src/ZeroMQ/Interop/SocketProxy.cs | 32 +++++++++---------- src/ZeroMQ/ZeroMQ.csproj | 2 +- 3 files changed, 20 insertions(+), 20 deletions(-) rename src/ZeroMQ/Interop/{MethodHelper.cs => Retry.cs} (64%) diff --git a/src/ZeroMQ/Interop/MethodHelper.cs b/src/ZeroMQ/Interop/Retry.cs similarity index 64% rename from src/ZeroMQ/Interop/MethodHelper.cs rename to src/ZeroMQ/Interop/Retry.cs index 81d3982..dbdf71d 100644 --- a/src/ZeroMQ/Interop/MethodHelper.cs +++ b/src/ZeroMQ/Interop/Retry.cs @@ -2,9 +2,9 @@ { using System; - internal static class MethodHelper + internal static class Retry { - public static int RetryIfInterrupted(Func operation, T1 arg1, T2 arg2, T3 arg3) + public static int IfInterrupted(Func operation, T1 arg1, T2 arg2, T3 arg3) { int rc; @@ -17,7 +17,7 @@ internal static class MethodHelper return rc; } - public static int RetryIfInterrupted(Func operation, T1 arg1, T2 arg2, T3 arg3, T4 arg4) + public static int IfInterrupted(Func operation, T1 arg1, T2 arg2, T3 arg3, T4 arg4) { int rc; diff --git a/src/ZeroMQ/Interop/SocketProxy.cs b/src/ZeroMQ/Interop/SocketProxy.cs index 7f78bb2..12597c3 100644 --- a/src/ZeroMQ/Interop/SocketProxy.cs +++ b/src/ZeroMQ/Interop/SocketProxy.cs @@ -86,7 +86,7 @@ public int Receive(byte[] buffer, int flags) // Use zmq_buffer_recv method if appropriate -> results in fewer P/Invoke calls if (buffer.Length <= MaxBufferSize && LibZmq.zmq_buffer_recv != null) { - bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_buffer_recv.Invoke, SocketHandle, _buffer, MaxBufferSize, flags); + bytesReceived = Retry.IfInterrupted(LibZmq.zmq_buffer_recv.Invoke, SocketHandle, _buffer, MaxBufferSize, flags); int size = Math.Min(buffer.Length, bytesReceived); Marshal.Copy(_buffer, buffer, 0, size); @@ -99,7 +99,7 @@ public int Receive(byte[] buffer, int flags) return -1; } - bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_recv.Invoke, _msg.Ptr, SocketHandle, flags); + bytesReceived = Retry.IfInterrupted(LibZmq.zmq_msg_recv.Invoke, _msg.Ptr, SocketHandle, flags); if (bytesReceived == 0 && LibZmq.MajorVersion < 3) { @@ -129,7 +129,7 @@ public byte[] Receive(byte[] buffer, int flags, out int size) return buffer; } - int bytesReceived = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_recv.Invoke, _msg.Ptr, SocketHandle, flags); + int bytesReceived = Retry.IfInterrupted(LibZmq.zmq_msg_recv.Invoke, _msg.Ptr, SocketHandle, flags); if (bytesReceived >= 0) { @@ -165,7 +165,7 @@ public int Send(byte[] buffer, int size, int flags) int sizeToSend = Math.Min(size, MaxBufferSize); Marshal.Copy(buffer, 0, _buffer, sizeToSend); - return MethodHelper.RetryIfInterrupted(LibZmq.zmq_buffer_send.Invoke, SocketHandle, _buffer, sizeToSend, flags); + return Retry.IfInterrupted(LibZmq.zmq_buffer_send.Invoke, SocketHandle, _buffer, sizeToSend, flags); } if (_msg.Init(size) == -1) @@ -178,7 +178,7 @@ public int Send(byte[] buffer, int size, int flags) Marshal.Copy(buffer, 0, _msg.Data(), size); } - int bytesSent = MethodHelper.RetryIfInterrupted(LibZmq.zmq_msg_send.Invoke, _msg.Ptr, SocketHandle, flags); + int bytesSent = Retry.IfInterrupted(LibZmq.zmq_msg_send.Invoke, _msg.Ptr, SocketHandle, flags); if (bytesSent == 0 && LibZmq.MajorVersion < 3) { @@ -238,7 +238,7 @@ public int GetSocketOption(int option, out int value) { Marshal.WriteInt32(optionLength, sizeof(int)); - int rc = MethodHelper.RetryIfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); + int rc = Retry.IfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); value = Marshal.ReadInt32(optionValue); return rc; @@ -252,7 +252,7 @@ public int GetSocketOption(int option, out long value) { Marshal.WriteInt32(optionLength, sizeof(long)); - int rc = MethodHelper.RetryIfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); + int rc = Retry.IfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); value = Marshal.ReadInt64(optionValue); return rc; @@ -266,7 +266,7 @@ public int GetSocketOption(int option, out ulong value) { Marshal.WriteInt32(optionLength, sizeof(ulong)); - int rc = MethodHelper.RetryIfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); + int rc = Retry.IfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); value = unchecked(Convert.ToUInt64(Marshal.ReadInt64(optionValue))); return rc; @@ -280,7 +280,7 @@ public int GetSocketOption(int option, out byte[] value) { Marshal.WriteInt32(optionLength, MaxBinaryOptionSize); - int rc = MethodHelper.RetryIfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); + int rc = Retry.IfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); value = new byte[Marshal.ReadInt32(optionLength)]; Marshal.Copy(optionValue, value, 0, value.Length); @@ -296,7 +296,7 @@ public int GetSocketOption(int option, out string value) { Marshal.WriteInt32(optionLength, MaxBinaryOptionSize); - int rc = MethodHelper.RetryIfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); + int rc = Retry.IfInterrupted(LibZmq.zmq_getsockopt.Invoke, SocketHandle, option, optionValue.Ptr, optionLength.Ptr); value = rc == 0 ? Marshal.PtrToStringAnsi(optionValue) : string.Empty; @@ -308,7 +308,7 @@ public int SetSocketOption(int option, string value) { if (value == null) { - return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, IntPtr.Zero, 0); + return Retry.IfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, IntPtr.Zero, 0); } var encoded = System.Text.Encoding.ASCII.GetBytes(value + "\x0"); @@ -316,7 +316,7 @@ public int SetSocketOption(int option, string value) { Marshal.Copy(encoded, 0, optionValue, encoded.Length); - return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, value.Length); + return Retry.IfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, value.Length); } } @@ -326,7 +326,7 @@ public int SetSocketOption(int option, int value) { Marshal.WriteInt32(optionValue, value); - return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, sizeof(int)); + return Retry.IfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, sizeof(int)); } } @@ -336,7 +336,7 @@ public int SetSocketOption(int option, long value) { Marshal.WriteInt64(optionValue, value); - return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, sizeof(long)); + return Retry.IfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, sizeof(long)); } } @@ -346,7 +346,7 @@ public int SetSocketOption(int option, ulong value) { Marshal.WriteInt64(optionValue, unchecked(Convert.ToInt64(value))); - return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, sizeof(ulong)); + return Retry.IfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, sizeof(ulong)); } } @@ -356,7 +356,7 @@ public int SetSocketOption(int option, byte[] value) { Marshal.Copy(value, 0, optionValue, value.Length); - return MethodHelper.RetryIfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, value.Length); + return Retry.IfInterrupted(LibZmq.zmq_setsockopt.Invoke, SocketHandle, option, optionValue.Ptr, value.Length); } } diff --git a/src/ZeroMQ/ZeroMQ.csproj b/src/ZeroMQ/ZeroMQ.csproj index df30a9a..51b43d4 100644 --- a/src/ZeroMQ/ZeroMQ.csproj +++ b/src/ZeroMQ/ZeroMQ.csproj @@ -56,7 +56,7 @@ - +