diff --git a/src/Core/Context.cs b/src/Core/Context.cs new file mode 100644 index 0000000..ba56317 --- /dev/null +++ b/src/Core/Context.cs @@ -0,0 +1,215 @@ +namespace CrossroadsIO +{ + using System; + using System.Text; + + using CrossroadsIO.Interop; + + /// + /// Creates Crossroads I/O instances within a process boundary. + /// + /// + /// The object is a container for all sockets in a single process, + /// and acts as the transport for inproc sockets. is thread safe. + /// A must not be terminated until all spawned sockets have been + /// successfully closed. + /// + public class Context : IDisposable + { + private readonly ContextProxy _contextProxy; + + private int _threadPoolSize = 1; + private int _maxSockets = 512; + private bool _disposed; + + static Context() + { + DefaultEncoding = Encoding.UTF8; + } + + internal Context(ContextProxy contextProxy) + { + if (contextProxy == null) + { + throw new ArgumentNullException("contextProxy"); + } + + _contextProxy = contextProxy; + } + + /// + /// Finalizes an instance of the class. + /// + ~Context() + { + Dispose(false); + } + + /// + /// Gets or sets the default encoding for all sockets in the current process. + /// + public static Encoding DefaultEncoding { get; set; } + + /// + /// Gets or sets the maximum number of sockets can be simultaneously active in the current context. (Default = 512). + /// + public int MaxSockets + { + get { return _maxSockets; } + set { SetContextOption(ContextOption.MAX_SOCKETS, (_maxSockets = value)); } + } + + /// + /// Gets or sets the size of the thread pool for the current context. (Default = 1) + /// + public int ThreadPoolSize + { + get { return _threadPoolSize; } + set { SetContextOption(ContextOption.IO_THREADS, (_threadPoolSize = value)); } + } + + /// + /// Create a instance. + /// + /// A new instance. + public static Context Create() + { + var contextProxy = new ContextProxy(); + + if (contextProxy.Initialize() == -1) + { + throw new XsException(ErrorProxy.GetLastError()); + } + + return new Context(contextProxy); + } + + /// + /// Create a socket with the current context and the specified socket type. + /// + /// A value for the socket. + /// A instance with the current context and the specified socket type. + public Socket CreateSocket(SocketType socketType) + { + switch (socketType) + { + case SocketType.REQ: + case SocketType.REP: + case SocketType.XREQ: + case SocketType.XREP: + case SocketType.XPUB: + case SocketType.PAIR: + return CreateSocket(sp => new DuplexSocket(sp, socketType), socketType); + + case SocketType.PUSH: + case SocketType.PUB: + return CreateSocket(sp => new SendSocket(sp, socketType), socketType); + + case SocketType.PULL: + return CreateSocket(sp => new ReceiveSocket(sp, socketType), socketType); + + case SocketType.SUB: + return CreateSocket(sp => new SubscribeSocket(sp, socketType), socketType); + + case SocketType.XSUB: + return CreateSocket(sp => new SubscribeExtSocket(sp, socketType), socketType); + } + + throw new InvalidOperationException("Invalid socket type specified: " + socketType); + } + + /// + /// Terminate the Crossroads I/O context. + /// + /// + /// Context termination is performed in the following steps: + /// + /// + /// The has already been disposed. + /// An error occurred creating the socket. + public void Terminate() + { + EnsureNotDisposed(); + + _contextProxy.Terminate(); + } + + /// + /// Releases all resources used by the current instance of the class. + /// + public virtual void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases the unmanaged resources used by the , and optionally disposes of the managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + _contextProxy.Dispose(); + } + } + + _disposed = true; + } + + private TSocket CreateSocket(Func constructor, SocketType socketType) + { + EnsureNotDisposed(); + + IntPtr socketHandle = _contextProxy.CreateSocket((int)socketType); + + if (socketHandle == IntPtr.Zero) + { + throw new XsException(ErrorProxy.GetLastError()); + } + + return constructor(new SocketProxy(socketHandle)); + } + + private void SetContextOption(ContextOption option, int value) + { + EnsureNotDisposed(); + + if (_contextProxy.SetContextOption((int)option, value) == -1) + { + throw new XsSocketException(ErrorProxy.GetLastError()); + } + } + + private void EnsureNotDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(GetType().FullName); + } + } + } +} diff --git a/src/Core/ContextOption.cs b/src/Core/ContextOption.cs new file mode 100644 index 0000000..579c8af --- /dev/null +++ b/src/Core/ContextOption.cs @@ -0,0 +1,8 @@ +namespace CrossroadsIO +{ + internal enum ContextOption + { + MAX_SOCKETS = 1, + IO_THREADS = 2, + } +} diff --git a/src/Core/Core.csproj b/src/Core/Core.csproj index 7dcea28..04fa06f 100644 --- a/src/Core/Core.csproj +++ b/src/Core/Core.csproj @@ -18,7 +18,7 @@ full false bin\Debug\ - DEBUG;TRACE + TRACE;DEBUG;WINDOWS;DOTNET prompt 4 @@ -26,7 +26,7 @@ pdbonly true bin\Release\ - TRACE + TRACE;WINDOWS;DOTNET prompt 4 @@ -40,8 +40,60 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + TDelegate. + return (TDelegate)(object)Marshal.GetDelegateForFunctionPointer(p, typeof(TDelegate)); + } + + public void Dispose() + { + if (!_handle.IsClosed) + { + _handle.Close(); + } + } + + private static SafeLibraryHandle NullifyInvalidHandle(SafeLibraryHandle handle) + { + return handle.IsInvalid ? null : handle; + } + + private SafeLibraryHandle LoadFromSystemPath() + { + return NullifyInvalidHandle(Platform.OpenHandle(_systemFileName)); + } + + private SafeLibraryHandle LoadFromLocalBinPath() + { + return Directory.Exists("bin") ? ExtractAndLoadFromPath("bin") : null; + } + + private SafeLibraryHandle LoadFromExecutingPath() + { + return ExtractAndLoadFromPath("."); + } + + private SafeLibraryHandle LoadFromTempPath() + { + string dir = Path.Combine(Path.GetTempPath(), Assembly.GetExecutingAssembly().FullName, CurrentArch); + Directory.CreateDirectory(dir); + + return ExtractAndLoadFromPath(dir); + } + + private SafeLibraryHandle ExtractAndLoadFromPath(string dir) + { + string libPath = Path.GetFullPath(Path.Combine(dir, _extractedFileName)); + string platformSuffix = "." + CurrentArch; + + if (!ManifestResource.Extract(_systemFileName + platformSuffix, libPath)) + { + return null; + } + + return NullifyInvalidHandle(Platform.OpenHandle(libPath)); + } + } +} diff --git a/src/Core/Message.cs b/src/Core/Message.cs new file mode 100644 index 0000000..e6b273c --- /dev/null +++ b/src/Core/Message.cs @@ -0,0 +1,188 @@ +namespace CrossroadsIO +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Linq; + + /// + /// A single or multi-part message sent or received via a . + /// + public class Message : IEnumerable + { + private readonly List _frames; + + /// + /// Initializes a new instance of the class. + /// Creates an empty message. + /// + public Message() + { + _frames = new List(); + } + + /// + /// Initializes a new instance of the class. + /// Creates a message that contains the given objects. + /// + /// A collection of objects to be stored by this . + /// is null. + public Message(IEnumerable frames) + { + if (frames == null) + { + throw new ArgumentNullException("frames"); + } + + _frames = new List(frames); + + NormalizeFrames(); + } + + /// + /// Initializes a new instance of the class. + /// Creates a message that contains the given arrays converted to s. + /// + /// A collection of arrays to be stored by this . + /// is null. + public Message(IEnumerable buffers) + { + if (buffers == null) + { + throw new ArgumentNullException("buffers"); + } + + _frames = buffers.Select(buf => new Frame(buf)).ToList(); + + NormalizeFrames(); + } + + /// + /// Gets a value indicating whether the current message is complete + /// (i.e. no more message parts follow the last part of this message). + /// + public bool IsComplete + { + get { return _frames.Count > 0 && !_frames.Last().HasMore; } + } + + /// + /// Gets a value indicating whether the current message is empty. + /// + public bool IsEmpty + { + get { return _frames.Count == 0; } + } + + /// + /// Gets the number of objects contained by this message. + /// + public int FrameCount + { + get { return _frames.Count; } + } + + /// + /// Gets the total number of bytes in this message. + /// + public int TotalSize + { + get { return _frames.Sum(f => f.MessageSize); } + } + + /// + /// Gets the at the specified index. + /// + /// The zero-based index of the to get. + /// The at the specified index. + /// + /// is less than 0 -or- is equal to or greater than . + /// + public Frame this[int index] + { + get { return _frames[index]; } + } + + /// + /// Adds the given to the end of the current . + /// + /// + /// Updates the property of the preceding frames accordingly. + /// + /// A object to append to this . + /// is null. + public void Append(Frame frame) + { + AppendShallowCopy(frame); + + NormalizeFrames(); + } + + /// + /// Adds the given array to the end of the current + /// as a . + /// + /// + /// Updates the property of the preceding frames accordingly. + /// + /// A array to append to this . + /// is null. + public void Append(byte[] buffer) + { + _frames.Add(new Frame(buffer)); + + NormalizeFrames(); + } + + /// + /// Adds an empty to the end of the current . + /// + /// + /// Updates the property of the preceding frames accordingly. + /// + public void AppendEmptyFrame() + { + _frames.Add(new Frame(0)); + + NormalizeFrames(); + } + + /// + /// Returns an enumerator that iterates through the objects + /// contained by this . + /// + /// An for the current . + public IEnumerator GetEnumerator() + { + return _frames.GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + internal void AppendShallowCopy(Frame frame) + { + if (frame == null) + { + throw new ArgumentNullException("frame"); + } + + _frames.Add(new Frame(frame)); + } + + private void NormalizeFrames() + { + if (_frames.Count == 0) + { + return; + } + + for (int i = 0; i < _frames.Count; i++) + { + _frames[i].HasMore = i < _frames.Count - 1; + } + } + } +} diff --git a/src/Core/Poller.cs b/src/Core/Poller.cs new file mode 100644 index 0000000..97aed08 --- /dev/null +++ b/src/Core/Poller.cs @@ -0,0 +1,233 @@ +namespace CrossroadsIO +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Threading; + + using CrossroadsIO.Interop; + + /// + /// Multiplexes input/output events in a level-triggered fashion over a set of sockets. + /// + /// + /// + /// Sockets will be polled according to their capabilities. For example, sockets that are + /// receive-only (e.g., PULL and SUB sockets) will only poll for Input events. Sockets that + /// can both send and receive (e.g., REP, REQ, etc.) will poll for both Input and Output events. + /// + /// + /// To actually send or receive data, the socket's and/or + /// event handlers must be attached to. If attached, these will + /// be invoked when data is ready to be received or sent. + /// + /// + public class Poller : IDisposable + { + private readonly Dictionary _pollableSockets; + private readonly PollerProxy _pollerProxy; + + private PollItem[] _pollItems; + + /// + /// Initializes a new instance of the class. + /// + public Poller() + : this(new PollerProxy()) + { + } + + /// + /// Initializes a new instance of the class with a collection of sockets to poll over. + /// + /// The collection of s to poll. + public Poller(IEnumerable socketsToPoll) + : this() + { + AddSockets(socketsToPoll); + } + + internal Poller(PollerProxy pollerProxy) + { + if (pollerProxy == null) + { + throw new ArgumentNullException("pollerProxy"); + } + + _pollerProxy = pollerProxy; + _pollableSockets = new Dictionary(); + Pulse = new AutoResetEvent(false); + } + + /// + /// Gets an that is pulsed after every Poll call. + /// + public AutoResetEvent Pulse { get; private set; } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Add a socket that will be polled for input/output events, depending on its capabilities. + /// + /// The to poll. + public void AddSocket(Socket socket) + { + if (socket == null) + { + throw new ArgumentNullException("socket"); + } + + _pollableSockets.Add(new PollItem(socket.SocketHandle, socket.GetPollEvents()), socket); + } + + /// + /// Add a collection of sockets that will be polled for input/output events, depending on their capabilities. + /// + /// The collection of s to poll. + public void AddSockets(IEnumerable sockets) + { + if (sockets == null) + { + throw new ArgumentNullException("sockets"); + } + + foreach (var socket in sockets) + { + AddSocket(socket); + } + } + + /// + /// Removes all sockets from the current collection. + /// + public void ClearSockets() + { + _pollableSockets.Clear(); + } + + /// + /// Multiplex input/output events over the contained set of sockets in blocking mode, firing + /// or as appropriate. + /// + /// An error occurred polling for socket events. + public void Poll() + { + PollBlocking(); + } + + /// + /// Multiplex input/output events over the contained set of sockets in non-blocking mode, firing + /// or as appropriate. + /// Returns when one or more events are ready to fire or when the specified timeout elapses, whichever + /// comes first. + /// + /// A indicating the timeout value. + /// An error occurred polling for socket events. + public void Poll(TimeSpan timeout) + { + if (timeout.TotalMilliseconds == Timeout.Infinite) + { + PollBlocking(); + } + else + { + PollNonBlocking(timeout); + } + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + /// True if the object is being disposed, false otherwise. + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + Pulse.Dispose(); + } + } + + private static void ContinueIfInterrupted() + { + // An error value of EINTR indicates that the operation was interrupted + // by delivery of a signal before any events were available. This is a recoverable + // error, so try polling again for the remaining amount of time in the timeout. + if (!ErrorProxy.ThreadWasInterrupted) + { + throw new XsSocketException(ErrorProxy.GetLastError()); + } + } + + private void PollBlocking() + { + CreatePollItems(); + + while (Poll(Timeout.Infinite) == -1 && !ErrorProxy.ContextWasTerminated) + { + ContinueIfInterrupted(); + } + } + + private void PollNonBlocking(TimeSpan timeout) + { + CreatePollItems(); + + var remainingTimeout = (int)timeout.TotalMilliseconds; + var elapsed = Stopwatch.StartNew(); + + do + { + int result = Poll(remainingTimeout); + + if (result >= 0 || ErrorProxy.ContextWasTerminated) + { + break; + } + + ContinueIfInterrupted(); + remainingTimeout -= (int)elapsed.ElapsedMilliseconds; + } + while (remainingTimeout >= 0); + } + + private void CreatePollItems() + { + if (_pollItems == null || _pollItems.Length != _pollableSockets.Count) + { + _pollItems = _pollableSockets.Keys.ToArray(); + } + } + + private int Poll(int timeoutMilliseconds) + { + if (_pollableSockets.Count == 0) + { + throw new InvalidOperationException("At least one socket is required for polling."); + } + + int readyCount = _pollerProxy.Poll(_pollItems, timeoutMilliseconds); + + Pulse.Set(); + + if (readyCount > 0) + { + foreach (PollItem pollItem in _pollItems.Where(item => item.ReadyEvents != (short)PollEvents.None)) + { + Socket socket = _pollableSockets[pollItem]; + + socket.InvokePollEvents((PollEvents)pollItem.ReadyEvents); + } + } + + return readyCount; + } + } +} diff --git a/src/Core/ProtocolType.cs b/src/Core/ProtocolType.cs new file mode 100644 index 0000000..5286bce --- /dev/null +++ b/src/Core/ProtocolType.cs @@ -0,0 +1,18 @@ +namespace CrossroadsIO +{ + /// + /// Specifies the protocols that a supports. + /// + public enum ProtocolType + { + /// + /// Both Internet Protocol versions 4 and 6. + /// + Both = 0, + + /// + /// Internet Protocol version 4. + /// + Ipv4Only = 1 + } +} diff --git a/src/Core/ReceiveSocket.cs b/src/Core/ReceiveSocket.cs new file mode 100644 index 0000000..843beee --- /dev/null +++ b/src/Core/ReceiveSocket.cs @@ -0,0 +1,29 @@ +namespace CrossroadsIO +{ + using System; + + using CrossroadsIO.Interop; + + internal class ReceiveSocket : Socket + { + internal ReceiveSocket(SocketProxy socketProxy, SocketType socketType) + : base(socketProxy, socketType) + { + } + + public override int Send(byte[] buffer, int size, SocketFlags flags) + { + throw new NotSupportedException(); + } + + public override void Subscribe(byte[] prefix) + { + throw new NotSupportedException(); + } + + public override void Unsubscribe(byte[] prefix) + { + throw new NotSupportedException(); + } + } +} diff --git a/src/Core/ReceiveStatus.cs b/src/Core/ReceiveStatus.cs new file mode 100644 index 0000000..e7f63ca --- /dev/null +++ b/src/Core/ReceiveStatus.cs @@ -0,0 +1,28 @@ +namespace CrossroadsIO +{ + /// + /// Specifies possible results for socket receive operations. + /// + public enum ReceiveStatus + { + /// + /// No receive operation has been performed. + /// + None, + + /// + /// The receive operation returned a message that contains data. + /// + Received, + + /// + /// Non-blocking mode was requested and no messages are available at the moment. + /// + TryAgain, + + /// + /// The receive operation was interrupted, likely by terminating the containing context. + /// + Interrupted + } +} diff --git a/src/Core/SendReceiveExtensions.cs b/src/Core/SendReceiveExtensions.cs new file mode 100644 index 0000000..044e90e --- /dev/null +++ b/src/Core/SendReceiveExtensions.cs @@ -0,0 +1,530 @@ +namespace CrossroadsIO +{ + using System; + using System.Text; + + /// + /// Defines extensions for Send/Receive methods in . + /// + public static class SendReceiveExtensions + { + /// + /// Queue a single-part (or final multi-part) message buffer to be sent by the socket in blocking mode. + /// + /// + /// This method assumes that the message fills the entire buffer. + /// + /// A object. + /// A array that contains the message to be sent. + /// A describing the outcome of the send operation. + /// is null. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus Send(this Socket socket, byte[] buffer) + { + VerifySocket(socket); + + socket.Send(buffer, buffer.Length, SocketFlags.None); + + return socket.SendStatus; + } + + /// + /// Queue a single-part (or final multi-part) message buffer to be sent by the socket in + /// non-blocking mode with a specified timeout. + /// + /// + /// This method assumes that the message fills the entire buffer. + /// + /// A object. + /// A array that contains the message to be sent. + /// A specifying the send timeout. + /// A describing the outcome of the send operation. + /// is null. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus Send(this Socket socket, byte[] buffer, TimeSpan timeout) + { + VerifySocket(socket); + + socket.Send(buffer, buffer.Length, SocketFlags.None, timeout); + + return socket.SendStatus; + } + + /// + /// Queue a single-part (or final multi-part) message string to be sent by the socket in blocking mode. + /// + /// A object. + /// A that contains the message to be sent. + /// The to use when converting to a buffer. + /// A describing the outcome of the send operation. + /// or is null. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus Send(this Socket socket, string message, Encoding encoding) + { + return Send(socket, message, encoding, TimeSpan.MaxValue); + } + + /// + /// Queue a single-part (or final multi-part) message string to be sent by the socket in + /// non-blocking mode with a specified timeout. + /// + /// A object. + /// A that contains the message to be sent. + /// The to use when converting to a buffer. + /// A specifying the send timeout. + /// A describing the outcome of the send operation. + /// or is null. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus Send(this Socket socket, string message, Encoding encoding, TimeSpan timeout) + { + VerifySocket(socket); + VerifyStringMessage(message); + VerifyEncoding(encoding); + + byte[] buffer = encoding.GetBytes(message); + + socket.Send(buffer, buffer.Length, SocketFlags.None, timeout); + + return socket.SendStatus; + } + + /// + /// Queue a non-final message-part buffer to be sent by the socket in blocking mode. + /// + /// + /// This method assumes that the message fills the entire buffer. The final message-part in + /// this series must be sent with or another overload + /// that does not specify . + /// + /// A object. + /// A array that contains the message to be sent. + /// A describing the outcome of the send operation. + /// is null. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus SendMore(this Socket socket, byte[] buffer) + { + return SendMore(socket, buffer, TimeSpan.MaxValue); + } + + /// + /// Queue a non-final message-part buffer to be sent by the socket in non-blocking mode with a specified timeout. + /// + /// + /// This method assumes that the message fills the entire buffer. The final message-part in + /// this series must be sent with or another overload + /// that does not specify . + /// + /// A object. + /// A array that contains the message to be sent. + /// A specifying the send timeout. + /// A describing the outcome of the send operation. + /// is null. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus SendMore(this Socket socket, byte[] buffer, TimeSpan timeout) + { + VerifySocket(socket); + + socket.Send(buffer, buffer.Length, SocketFlags.SendMore, timeout); + + return socket.SendStatus; + } + + /// + /// Queue a non-final message-part string to be sent by the socket in blocking mode. + /// + /// A object. + /// A that contains the message to be sent. + /// The to use when converting to a buffer. + /// A describing the outcome of the send operation. + /// or is null. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus SendMore(this Socket socket, string message, Encoding encoding) + { + return SendMore(socket, message, encoding, TimeSpan.MaxValue); + } + + /// + /// Queue a non-final message-part string to be sent by the socket in non-blocking mode with a specified timeout. + /// + /// A object. + /// A that contains the message to be sent. + /// The to use when converting to a buffer. + /// A specifying the send timeout. + /// A describing the outcome of the send operation. + /// or is null. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus SendMore(this Socket socket, string message, Encoding encoding, TimeSpan timeout) + { + VerifySocket(socket); + VerifyStringMessage(message); + VerifyEncoding(encoding); + + byte[] buffer = encoding.GetBytes(message); + + socket.Send(buffer, buffer.Length, SocketFlags.SendMore, timeout); + + return socket.SendStatus; + } + + /// + /// Receive a string message from a remote socket in blocking mode. + /// + /// A object. + /// The to use when converting the received buffer to a string. + /// A containing the message received from the remote endpoint. + /// is null. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public static string Receive(this Socket socket, Encoding encoding) + { + return Receive(socket, encoding, TimeSpan.MaxValue); + } + + /// + /// Receive a string message from a remote socket in non-blocking mode with a specified timeout. + /// + /// A object. + /// The to use when converting the received buffer to a string. + /// A specifying the receive timeout. + /// A containing the message received from the remote endpoint. + /// is null. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public static string Receive(this Socket socket, Encoding encoding, TimeSpan timeout) + { + VerifySocket(socket); + VerifyEncoding(encoding); + + int messageSize; + byte[] buffer = socket.Receive(null, timeout, out messageSize); + + return encoding.GetString(buffer, 0, messageSize); + } + + /// + /// Receive a single frame from a remote socket in blocking mode. + /// + /// + /// This overload will allocate a new for receiving all available data in the message-part. + /// + /// A object. + /// A containing the data received from the remote endpoint. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public static Frame ReceiveFrame(this Socket socket) + { + return ReceiveFrame(socket, null); + } + + /// + /// Receive a single frame from a remote socket in non-blocking mode with a specified timeout. + /// + /// + /// This overload will allocate a new for receiving all available data in the message-part. + /// + /// A object. + /// A specifying the receive timeout. + /// A containing the data received from the remote endpoint. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public static Frame ReceiveFrame(this Socket socket, TimeSpan timeout) + { + return ReceiveFrame(socket, null, timeout); + } + + /// + /// Receive a single frame from a remote socket in blocking mode. + /// + /// + /// This overload will receive all available data in the message-part. If the buffer size of + /// is insufficient, a new buffer will be allocated. + /// + /// A object. + /// A that will store the received data. + /// A containing the data received from the remote endpoint. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public static Frame ReceiveFrame(this Socket socket, Frame frame) + { + VerifySocket(socket); + + if (frame == null) + { + frame = new Frame(0); + } + + int size; + + frame.Buffer = socket.Receive(frame.Buffer, out size); + SetFrameProperties(frame, socket, size); + + return frame; + } + + /// + /// Receive a single frame from a remote socket in non-blocking mode with a specified timeout. + /// + /// + /// This overload will receive all available data in the message-part. If the buffer size of + /// is insufficient, a new buffer will be allocated. + /// + /// A object. + /// A that will store the received data. + /// A specifying the receive timeout. + /// A containing the data received from the remote endpoint. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public static Frame ReceiveFrame(this Socket socket, Frame frame, TimeSpan timeout) + { + VerifySocket(socket); + + if (frame == null) + { + frame = new Frame(0); + } + + int size; + + frame.Buffer = socket.Receive(frame.Buffer, timeout, out size); + SetFrameProperties(frame, socket, size); + + return frame; + } + + /// + /// Queue a message frame to be sent by the socket in blocking mode. + /// + /// + /// The property on will be used to indicate whether + /// more frames will follow in the current multi-part message sequence. + /// + /// A object. + /// A that contains the message to be sent. + /// A describing the outcome of the send operation. + /// is null. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus SendFrame(this Socket socket, Frame frame) + { + return SendFrame(socket, frame, TimeSpan.MaxValue); + } + + /// + /// Queue a message frame to be sent by the socket in non-blocking mode with a specified timeout. + /// + /// + /// The property on will be used to indicate whether + /// more frames will follow in the current multi-part message sequence. + /// + /// A object. + /// A that contains the message to be sent. + /// A specifying the send timeout. + /// A describing the outcome of the send operation. + /// is null. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus SendFrame(this Socket socket, Frame frame, TimeSpan timeout) + { + VerifySocket(socket); + VerifyFrame(frame); + + socket.Send(frame.Buffer, frame.MessageSize, frame.HasMore ? SocketFlags.SendMore : SocketFlags.None, timeout); + + return socket.SendStatus; + } + + /// + /// Receive all parts of a multi-part message from a remote socket in blocking mode. + /// + /// A object. + /// A containing a collection of s received from the remote endpoint. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public static Message ReceiveMessage(this Socket socket) + { + return ReceiveMessage(socket, new Message()); + } + + /// + /// Receive all parts of a multi-part message from a remote socket in blocking mode + /// and append them to a given message. + /// + /// A object. + /// The to which message-parts will be appended. + /// The supplied with newly received objects appended. + /// is null. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public static Message ReceiveMessage(this Socket socket, Message message) + { + return ReceiveMessage(socket, message, TimeSpan.MaxValue); + } + + /// + /// Receive all parts of a multi-part message from a remote socket in non-blocking mode. + /// + /// + /// The will be used for each underlying Receive operation. If the timeout + /// elapses before the last message is received, an incomplete message will be returned. Use the + /// overload to continue + /// appending message-parts if the returned has its + /// property set to false. + /// + /// A object. + /// A specifying the receive timeout for each frame. + /// A containing newly received objects. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public static Message ReceiveMessage(this Socket socket, TimeSpan frameTimeout) + { + return ReceiveMessage(socket, new Message(), frameTimeout); + } + + /// + /// Receive all parts of a multi-part message from a remote socket in non-blocking mode. + /// + /// + /// The will be used for each underlying Receive operation. If the timeout + /// elapses before the last message is received, an incomplete message will be returned. + /// + /// A object. + /// The to which message-parts will be appended. + /// A specifying the receive timeout for each frame. + /// A containing newly received objects. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public static Message ReceiveMessage(this Socket socket, Message message, TimeSpan frameTimeout) + { + VerifySocket(socket); + VerifyMessage(message); + + Frame frame; + + do + { + frame = socket.ReceiveFrame(frameTimeout); + + if (frame.ReceiveStatus == ReceiveStatus.Received) + { + message.AppendShallowCopy(frame); + } + } + while (frame.ReceiveStatus == ReceiveStatus.Received && frame.HasMore); + + return message; + } + + /// + /// Queue a multi-part message to be sent by the socket in blocking mode. + /// + /// A object. + /// A that contains the message parts to be sent. + /// A describing the outcome of the send operation. + /// is null. + /// is incomplete. + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public static SendStatus SendMessage(this Socket socket, Message message) + { + VerifySocket(socket); + VerifyMessage(message); + + if (message.IsEmpty) + { + return SendStatus.Sent; + } + + if (!message.IsComplete) + { + throw new ArgumentException("Unable to send an incomplete message. Ensure HasMore on the last Frame is set to 'false'.", "message"); + } + + foreach (Frame frame in message) + { + socket.SendFrame(frame); + } + + return socket.SendStatus; + } + + private static void SetFrameProperties(Frame frame, Socket socket, int size) + { + if (size >= 0) + { + frame.MessageSize = size; + } + + frame.HasMore = socket.ReceiveMore; + frame.ReceiveStatus = socket.ReceiveStatus; + } + + private static void VerifySocket(Socket socket) + { + if (socket == null) + { + throw new ArgumentNullException("socket"); + } + } + + private static void VerifyMessage(Message message) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + } + + private static void VerifyFrame(Frame frame) + { + if (frame == null) + { + throw new ArgumentNullException("frame"); + } + } + + private static void VerifyStringMessage(string message) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + } + + private static void VerifyEncoding(Encoding encoding) + { + if (encoding == null) + { + throw new ArgumentNullException("encoding"); + } + } + } +} diff --git a/src/Core/SendSocket.cs b/src/Core/SendSocket.cs new file mode 100644 index 0000000..bae971f --- /dev/null +++ b/src/Core/SendSocket.cs @@ -0,0 +1,34 @@ +namespace CrossroadsIO +{ + using System; + + using CrossroadsIO.Interop; + + internal class SendSocket : Socket + { + internal SendSocket(SocketProxy socketProxy, SocketType socketType) + : base(socketProxy, socketType) + { + } + + public override void Subscribe(byte[] prefix) + { + throw new NotSupportedException(); + } + + public override void Unsubscribe(byte[] prefix) + { + throw new NotSupportedException(); + } + + internal override int Receive(byte[] buffer, SocketFlags flags) + { + throw new NotSupportedException(); + } + + internal override byte[] Receive(byte[] frame, SocketFlags flags, out int size) + { + throw new NotSupportedException(); + } + } +} diff --git a/src/Core/SendStatus.cs b/src/Core/SendStatus.cs new file mode 100644 index 0000000..03bee77 --- /dev/null +++ b/src/Core/SendStatus.cs @@ -0,0 +1,33 @@ +namespace CrossroadsIO +{ + /// + /// Specifies possible results for socket send operations. + /// + public enum SendStatus + { + /// + /// No send operation was performed. + /// + None, + + /// + /// The message was queued to be sent by the socket. + /// + Sent, + + /// + /// The send operation completed but only part of the requested message was sent. + /// + Incomplete, + + /// + /// Non-blocking mode was requested and the message cannot be sent at the moment. + /// + TryAgain, + + /// + /// The send operation was interrupted, likely by terminating the containing context. + /// + Interrupted + } +} diff --git a/src/Core/Socket.cs b/src/Core/Socket.cs new file mode 100644 index 0000000..4513f14 --- /dev/null +++ b/src/Core/Socket.cs @@ -0,0 +1,859 @@ +namespace CrossroadsIO +{ + using System; + using System.Diagnostics; + using System.Threading; + + using CrossroadsIO.Interop; + + /// + /// Sends and receives messages across various transports to potentially multiple endpoints + /// using the XS protocol. + /// + public class Socket : IDisposable + { + private static readonly int ProcessorCount = Environment.ProcessorCount; + + private readonly SocketProxy _socketProxy; + + private bool _disposed; + + internal Socket(SocketProxy socketProxy, SocketType socketType) + { + if (socketProxy == null) + { + throw new ArgumentNullException("socketProxy"); + } + + _socketProxy = socketProxy; + SocketType = socketType; + } + + /// + /// Finalizes an instance of the class. + /// + ~Socket() + { + Dispose(false); + } + + /// + /// Occurs when at least one message may be received from the socket without blocking. + /// + public event EventHandler ReceiveReady; + + /// + /// Occurs when at least one message may be sent via the socket without blocking. + /// + public event EventHandler SendReady; + + /// + /// Gets the value for the current socket. + /// + public SocketType SocketType { get; private set; } + + /// + /// Gets or sets the I/O thread affinity for newly created connections on this socket. + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public ulong Affinity + { + get { return GetSocketOptionUInt64(SocketOption.AFFINITY); } + set { SetSocketOption(SocketOption.AFFINITY, value); } + } + + /// + /// Gets or sets the maximum length of the queue of outstanding peer connections. (Default = 100 connections). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public int Backlog + { + get { return GetSocketOptionInt32(SocketOption.BACKLOG); } + set { SetSocketOption(SocketOption.BACKLOG, value); } + } + + /// + /// Gets or sets the identity of the current socket. + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public byte[] Identity + { + get { return GetSocketOptionBytes(SocketOption.IDENTITY); } + set { SetSocketOption(SocketOption.IDENTITY, value); } + } + + /// + /// Gets or sets a value indicating whether protocol keepalives are enabled for the current socket, + /// if supported by the underlying transport mechanism. (Default = false). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public bool KeepAlive + { + get { return GetSocketOptionInt32(SocketOption.KEEPALIVE) == 1; } + set { SetSocketOption(SocketOption.KEEPALIVE, value ? 1 : 0); } + } + + /// + /// Gets or sets the linger period for socket shutdown. (Default = , infinite). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public TimeSpan Linger + { + get { return TimeSpan.FromMilliseconds(GetSocketOptionInt32(SocketOption.LINGER)); } + set { SetSocketOption(SocketOption.LINGER, (int)value.TotalMilliseconds); } + } + + /// + /// Gets or sets the maximum size for inbound messages (bytes). (Default = -1, no limit). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public long MaxMessageSize + { + get { return GetSocketOptionInt64(SocketOption.MAX_MSG_SIZE); } + set { SetSocketOption(SocketOption.MAX_MSG_SIZE, value); } + } + + /// + /// Gets or sets the time-to-live field in every multicast packet sent from this socket (network hops). (Default = 1 hop). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public int MulticastHops + { + get { return GetSocketOptionInt32(SocketOption.MULTICAST_HOPS); } + set { SetSocketOption(SocketOption.MULTICAST_HOPS, value); } + } + + /// + /// Gets or sets the maximum send or receive data rate for multicast transports (kbps). (Default = 100 kbps). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public int MulticastRate + { + get { return GetSocketOptionInt32(SocketOption.RATE); } + set { SetSocketOption(SocketOption.RATE, value); } + } + + /// + /// Gets or sets the recovery interval for multicast transports. (Default = 10 seconds). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public TimeSpan MulticastRecoveryInterval + { + get { return TimeSpan.FromMilliseconds(GetSocketOptionInt32(SocketOption.RECOVERY_IVL)); } + set { SetSocketOption(SocketOption.RECOVERY_IVL, (int)value.TotalMilliseconds); } + } + + /// + /// Gets or sets the underlying kernel receive buffer size for the current socket (bytes). (Default = 0, OS default). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public int ReceiveBufferSize + { + get { return GetSocketOptionInt32(SocketOption.RCVBUF); } + set { SetSocketOption(SocketOption.RCVBUF, value); } + } + + /// + /// Gets or sets the high water mark for inbound messages (number of messages). (Default = 0, no limit). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public int ReceiveHighWatermark + { + get { return GetSocketOptionInt32(SocketOption.RCVHWM); } + set { SetSocketOption(SocketOption.RCVHWM, value); } + } + + /// + /// Gets a value indicating whether the multi-part message currently being read has more message parts to follow. + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public bool ReceiveMore + { + get { return GetSocketOptionInt32(SocketOption.RCVMORE) == 1; } + } + + /// + /// Gets or sets the timeout for receive operations. (Default = , infinite). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public TimeSpan ReceiveTimeout + { + get { return TimeSpan.FromMilliseconds(GetSocketOptionInt32(SocketOption.RCVTIMEO)); } + set { SetSocketOption(SocketOption.RCVTIMEO, (int)value.TotalMilliseconds); } + } + + /// + /// Gets or sets the initial reconnection interval. (Default = 100 milliseconds). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public TimeSpan ReconnectInterval + { + get { return TimeSpan.FromMilliseconds(GetSocketOptionInt32(SocketOption.RECONNECT_IVL)); } + set { SetSocketOption(SocketOption.RECONNECT_IVL, (int)value.TotalMilliseconds); } + } + + /// + /// Gets or sets the maximum reconnection interval. (Default = 0, only use ). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public TimeSpan ReconnectIntervalMax + { + get { return TimeSpan.FromMilliseconds(GetSocketOptionInt32(SocketOption.RECONNECT_IVL_MAX)); } + set { SetSocketOption(SocketOption.RECONNECT_IVL_MAX, (int)value.TotalMilliseconds); } + } + + /// + /// Gets or sets the underlying kernel transmit buffer size for the current socket (bytes). (Default = 0, OS default). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public int SendBufferSize + { + get { return GetSocketOptionInt32(SocketOption.SNDBUF); } + set { SetSocketOption(SocketOption.SNDBUF, value); } + } + + /// + /// Gets or sets the high water mark for outbound messages (number of messages). (Default = 0, no limit). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public int SendHighWatermark + { + get { return GetSocketOptionInt32(SocketOption.SNDHWM); } + set { SetSocketOption(SocketOption.SNDHWM, value); } + } + + /// + /// Gets or sets the timeout for send operations. (Default = , infinite). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public TimeSpan SendTimeout + { + get { return TimeSpan.FromMilliseconds(GetSocketOptionInt32(SocketOption.SNDTIMEO)); } + set { SetSocketOption(SocketOption.SNDTIMEO, (int)value.TotalMilliseconds); } + } + + /// + /// Gets or sets the supported socket protocol(s) when using TCP transports. (Default = ). + /// + /// An error occurred when getting or setting the socket option. + /// The has been closed. + public ProtocolType SupportedProtocol + { + get { return (ProtocolType)GetSocketOptionInt32(SocketOption.IPV4_ONLY); } + set { SetSocketOption(SocketOption.IPV4_ONLY, (int)value); } + } + + /// + /// Gets the status of the last Receive operation. + /// + public ReceiveStatus ReceiveStatus { get; private set; } + + /// + /// Gets the status of the last Send operation. + /// + public SendStatus SendStatus { get; private set; } + + internal IntPtr SocketHandle + { + get { return _socketProxy.SocketHandle; } + } + + /// + /// Create an endpoint for accepting connections and bind it to the current socket. + /// + /// A string consisting of a transport and an address, formatted as transport://address. + /// is null. + /// An error occurred binding the socket to an endpoint. + /// The has been closed. + public void Bind(string endpoint) + { + EnsureNotDisposed(); + + if (endpoint == null) + { + throw new ArgumentNullException("endpoint"); + } + + if (endpoint == string.Empty) + { + throw new ArgumentException("Unable to Bind to an empty endpoint.", "endpoint"); + } + + HandleProxyResult(_socketProxy.Bind(endpoint)); + } + + /// + /// Connect the current socket to the specified endpoint. + /// + /// A string consisting of a transport and an address, formatted as transport://address. + /// is null. + /// An error occurred connecting the socket to a remote endpoint. + /// The has been closed. + public void Connect(string endpoint) + { + EnsureNotDisposed(); + + if (endpoint == null) + { + throw new ArgumentNullException("endpoint"); + } + + if (endpoint == string.Empty) + { + throw new ArgumentException("Unable to Connect to an empty endpoint.", "endpoint"); + } + + HandleProxyResult(_socketProxy.Connect(endpoint)); + } + + /// + /// Destroy the current socket. + /// + /// + /// Any outstanding messages physically received from the network but not yet received by the application + /// with Receive shall be discarded. The behaviour for discarding messages sent by the application + /// with Send but not yet physically transferred to the network depends on the value of + /// the socket option. + /// + /// The underlying socket object is not valid. + public void Close() + { + HandleProxyResult(_socketProxy.Close()); + } + + /// + /// Receive a single message-part from a remote socket in blocking mode. + /// + /// + /// Warning: This overload will only receive as much data as can fit in the supplied . + /// It is intended to be used when the maximum messaging performance is required; it will not allocate a new + /// buffer (or copy received data) if the received message exceeds the current buffer size. + /// If the maximum message size is not known in advance, use the overload. + /// + /// A array that will store the received data. + /// The number of bytes contained in the resulting message. + /// is null. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public int Receive(byte[] buffer) + { + return Receive(buffer, SocketFlags.None); + } + + /// + /// Receive a single message-part from a remote socket in non-blocking mode with a specified timeout. + /// + /// + /// Warning: This overload will only receive as much data as can fit in the supplied . + /// It is intended to be used when the maximum messaging performance is required, as it does not perform + /// any unnecessary memory allocation, copying or marshalling. + /// If the maximum message size is not known in advance, use the overload. + /// + /// A array that will store the received data. + /// A specifying the receive timeout. + /// + /// The number of bytes contained in the resulting message or -1 if the timeout expired or an interrupt occurred. + /// See for details. + /// + /// is null. + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public int Receive(byte[] buffer, TimeSpan timeout) + { + return timeout == TimeSpan.MaxValue + ? Receive(buffer) + : ExecuteWithTimeout(() => Receive(buffer, SocketFlags.DontWait), timeout); + } + + /// + /// Receive a single message-part from a remote socket in blocking mode. + /// + /// + /// This overload will receive all available data in the message-part. If the size of + /// is insufficient, a new buffer will be allocated. + /// + /// A array that may store the received data. + /// An that will contain the number of bytes in the received data. + /// + /// A array containing the data received from the remote endpoint, which may or may + /// not be the supplied . + /// + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public byte[] Receive(byte[] buffer, out int size) + { + return Receive(buffer, SocketFlags.None, out size); + } + + /// + /// Receive a single message-part from a remote socket in non-blocking mode with a specified timeout. + /// + /// + /// This overload will receive all available data in the message-part. If the size of + /// is insufficient, a new buffer will be allocated. + /// + /// A that will store the received data. + /// A specifying the receive timeout. + /// An that will contain the number of bytes in the received data. + /// + /// A array containing the data received from the remote endpoint, which may or may + /// not be the supplied . + /// + /// An error occurred receiving data from a remote endpoint. + /// The has been closed. + /// The current socket type does not support Receive operations. + public byte[] Receive(byte[] buffer, TimeSpan timeout, out int size) + { + if (timeout == TimeSpan.MaxValue) + { + return Receive(buffer, out size); + } + + int receivedBytes = -1; + + byte[] message = ExecuteWithTimeout(() => Receive(buffer, SocketFlags.DontWait, out receivedBytes), timeout); + size = receivedBytes; + + return message; + } + + /// + /// Queue a message buffer to be sent by the socket in blocking mode. + /// + /// A array that contains the message to be sent. + /// The size of the message to send. + /// A combination of values to use when sending. + /// The number of bytes sent by the socket. + /// is null. + /// is a negative value or is larger than the length of . + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public virtual int Send(byte[] buffer, int size, SocketFlags flags) + { + EnsureNotDisposed(); + + if (buffer == null) + { + throw new ArgumentNullException("buffer"); + } + + if (size < 0 || size > buffer.Length) + { + throw new ArgumentOutOfRangeException("size", "Expected a non-negative value less than or equal to the buffer length."); + } + + int sentBytes = _socketProxy.Send(buffer, size, (int)flags); + + if (sentBytes >= 0) + { + SendStatus = sentBytes == size ? SendStatus.Sent : SendStatus.Incomplete; + return sentBytes; + } + + if (ErrorProxy.ShouldTryAgain) + { + SendStatus = SendStatus.TryAgain; + return -1; + } + + if (ErrorProxy.ContextWasTerminated) + { + SendStatus = SendStatus.Interrupted; + return -1; + } + + throw new XsSocketException(ErrorProxy.GetLastError()); + } + + /// + /// Queue a message buffer to be sent by the socket in non-blocking mode with a specified timeout. + /// + /// A array that contains the message to be sent. + /// The size of the message to send. + /// A combination of values to use when sending. + /// A specifying the send timeout. + /// The number of bytes sent by the socket. + /// is null. + /// + /// is a negative value or is larger than the length of . + /// + /// An error occurred sending data to a remote endpoint. + /// The has been closed. + /// The current socket type does not support Send operations. + public int Send(byte[] buffer, int size, SocketFlags flags, TimeSpan timeout) + { + return timeout == TimeSpan.MaxValue + ? Send(buffer, size, flags & ~SocketFlags.DontWait) + : ExecuteWithTimeout(() => Send(buffer, size, flags | SocketFlags.DontWait), timeout); + } + + /// + /// Forwards a single-part or all parts of a multi-part message to a destination socket. + /// + /// + /// This method is useful for implementing devices as data is not marshalled into managed code; it + /// is forwarded directly in the unmanaged layer. As an example, this method could forward all traffic + /// from a device's front-end socket to its backend socket. + /// + /// A that will receive the incoming message(s). + public void Forward(Socket destination) + { + if (destination == null) + { + throw new ArgumentNullException("destination"); + } + + if (_socketProxy.Forward(destination.SocketHandle) == -1) + { + throw new XsSocketException(ErrorProxy.GetLastError()); + } + } + + /// + /// Subscribe to all messages. + /// + /// + /// Only applies to and sockets. + /// + /// The has been closed. + /// The current socket type does not support subscriptions. + public void SubscribeAll() + { + Subscribe(new byte[0]); + } + + /// + /// Subscribe to messages that begin with a specified prefix. + /// + /// + /// Only applies to and sockets. + /// + /// Prefix for subscribed messages. + /// is null. + /// The has been closed. + /// The current socket type does not support subscriptions. + public virtual void Subscribe(byte[] prefix) + { + SetSocketOption(SocketOption.SUBSCRIBE, prefix); + } + + /// + /// Unsubscribe from all messages. + /// + /// + /// Only applies to and sockets. + /// + /// The has been closed. + /// The current socket type does not support subscriptions. + public void UnsubscribeAll() + { + Unsubscribe(new byte[0]); + } + + /// + /// Unsubscribe from messages that begin with a specified prefix. + /// + /// + /// Only applies to and sockets. + /// + /// Prefix for subscribed messages. + /// is null. + /// The has been closed. + /// The current socket type does not support subscriptions. + public virtual void Unsubscribe(byte[] prefix) + { + SetSocketOption(SocketOption.UNSUBSCRIBE, prefix); + } + + /// + /// Releases all resources used by the current instance of the class. + /// + public virtual void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + internal virtual int Receive(byte[] buffer, SocketFlags flags) + { + EnsureNotDisposed(); + + if (buffer == null) + { + throw new ArgumentNullException("buffer"); + } + + int receivedBytes = _socketProxy.Receive(buffer, (int)flags); + + if (receivedBytes >= 0) + { + ReceiveStatus = ReceiveStatus.Received; + return receivedBytes; + } + + if (ErrorProxy.ShouldTryAgain) + { + ReceiveStatus = ReceiveStatus.TryAgain; + return -1; + } + + if (ErrorProxy.ContextWasTerminated) + { + ReceiveStatus = ReceiveStatus.Interrupted; + return -1; + } + + throw new XsSocketException(ErrorProxy.GetLastError()); + } + + internal virtual byte[] Receive(byte[] buffer, SocketFlags flags, out int size) + { + EnsureNotDisposed(); + + if (buffer == null) + { + buffer = new byte[0]; + } + + buffer = _socketProxy.Receive(buffer, (int)flags, out size); + + if (size >= 0) + { + ReceiveStatus = ReceiveStatus.Received; + return buffer; + } + + if (ErrorProxy.ShouldTryAgain) + { + ReceiveStatus = ReceiveStatus.TryAgain; + return buffer; + } + + if (ErrorProxy.ContextWasTerminated) + { + ReceiveStatus = ReceiveStatus.Interrupted; + return buffer; + } + + throw new XsSocketException(ErrorProxy.GetLastError()); + } + + internal int GetSocketOptionInt32(SocketOption option) + { + EnsureNotDisposed(); + + int value; + + HandleProxyResult(_socketProxy.GetSocketOption((int)option, out value)); + + return value; + } + + internal long GetSocketOptionInt64(SocketOption option) + { + EnsureNotDisposed(); + + long value; + + HandleProxyResult(_socketProxy.GetSocketOption((int)option, out value)); + + return value; + } + + internal ulong GetSocketOptionUInt64(SocketOption option) + { + EnsureNotDisposed(); + + ulong value; + + HandleProxyResult(_socketProxy.GetSocketOption((int)option, out value)); + + return value; + } + + internal byte[] GetSocketOptionBytes(SocketOption option) + { + EnsureNotDisposed(); + + byte[] value; + + HandleProxyResult(_socketProxy.GetSocketOption((int)option, out value)); + + return value; + } + + internal void SetSocketOption(SocketOption option, int value) + { + EnsureNotDisposed(); + + HandleProxyResult(_socketProxy.SetSocketOption((int)option, value)); + } + + internal void SetSocketOption(SocketOption option, long value) + { + EnsureNotDisposed(); + + HandleProxyResult(_socketProxy.SetSocketOption((int)option, value)); + } + + internal void SetSocketOption(SocketOption option, ulong value) + { + EnsureNotDisposed(); + + HandleProxyResult(_socketProxy.SetSocketOption((int)option, value)); + } + + internal void SetSocketOption(SocketOption option, byte[] value) + { + if (value == null) + { + throw new ArgumentNullException("value"); + } + + EnsureNotDisposed(); + + HandleProxyResult(_socketProxy.SetSocketOption((int)option, value)); + } + + internal void InvokePollEvents(PollEvents readyEvents) + { + if (readyEvents.HasFlag(PollEvents.PollIn)) + { + InvokeReceiveReady(readyEvents); + } + + if (readyEvents.HasFlag(PollEvents.PollOut)) + { + InvokeSendReady(readyEvents); + } + } + + internal PollEvents GetPollEvents() + { + PollEvents events = PollEvents.None; + + if (ReceiveReady != null) + { + events |= PollEvents.PollIn; + } + + if (SendReady != null) + { + events |= PollEvents.PollOut; + } + + return events; + } + + /// + /// Releases the unmanaged resources used by the , and optionally disposes of the managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + _socketProxy.Dispose(); + } + } + + _disposed = true; + } + + private static void HandleProxyResult(int result) + { + // Context termination (ETERM) is an allowable error state, occurring when the + // Context was terminated during a socket method. + if (result == -1 && !ErrorProxy.ContextWasTerminated) + { + throw new XsSocketException(ErrorProxy.GetLastError()); + } + } + + private TResult ExecuteWithTimeout(Func method, TimeSpan timeout) + { + TResult receiveResult; + + int iterations = 0; + var timeoutMilliseconds = (int)timeout.TotalMilliseconds; + var timer = Stopwatch.StartNew(); + + do + { + receiveResult = method(); + + if (ReceiveStatus != ReceiveStatus.TryAgain || timeoutMilliseconds <= 1) + { + break; + } + + if (iterations < 20 && ProcessorCount > 1) + { + // If we have a short wait (< 20 iterations) we SpinWait to allow other threads + // on HyperThreaded CPUs to use the CPU. The more CPUs we have, the longer it's + // acceptable to SpinWait since we stall the overall system less. + Thread.SpinWait(100 * ProcessorCount); + } + else + { + Thread.Yield(); + } + + ++iterations; + } + while (timer.Elapsed < timeout); + + return receiveResult; + } + + private void InvokeReceiveReady(PollEvents readyEvents) + { + EventHandler handler = ReceiveReady; + if (handler != null) + { + handler(this, new SocketEventArgs(this, readyEvents)); + } + } + + private void InvokeSendReady(PollEvents readyEvents) + { + EventHandler handler = SendReady; + if (handler != null) + { + handler(this, new SocketEventArgs(this, readyEvents)); + } + } + + private void EnsureNotDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(GetType().FullName); + } + } + } +} diff --git a/src/Core/SocketEventArgs.cs b/src/Core/SocketEventArgs.cs new file mode 100644 index 0000000..691f296 --- /dev/null +++ b/src/Core/SocketEventArgs.cs @@ -0,0 +1,39 @@ +namespace CrossroadsIO +{ + using System; + + using CrossroadsIO.Interop; + + /// + /// Provides data for and events. + /// + public class SocketEventArgs : EventArgs + { + internal SocketEventArgs(Socket socket, PollEvents readyEvents) + { + if (socket == null) + { + throw new ArgumentNullException("socket"); + } + + Socket = socket; + ReceiveReady = readyEvents.HasFlag(PollEvents.PollIn); + SendReady = readyEvents.HasFlag(PollEvents.PollOut); + } + + /// + /// Gets the socket that may be used to receive or send at least one message without blocking. + /// + public Socket Socket { get; private set; } + + /// + /// Gets a value indicating whether at least one message may be received by the socket without blocking. + /// + public bool ReceiveReady { get; private set; } + + /// + /// Gets a value indicating whether at least one message may be sent by the socket without blocking. + /// + public bool SendReady { get; private set; } + } +} diff --git a/src/Core/SocketFlags.cs b/src/Core/SocketFlags.cs new file mode 100644 index 0000000..8b43d37 --- /dev/null +++ b/src/Core/SocketFlags.cs @@ -0,0 +1,26 @@ +namespace CrossroadsIO +{ + using System; + + /// + /// Flags used by socket Send and Receive operations. + /// + [Flags] + public enum SocketFlags + { + /// + /// No socket flags are specified. + /// + None = 0, + + /// + /// The operation should be performed in non-blocking mode. + /// + DontWait = 0x1, + + /// + /// The message being sent is a multi-part message, and that further message parts are to follow. + /// + SendMore = 0x2, + } +} diff --git a/src/Core/SocketOption.cs b/src/Core/SocketOption.cs new file mode 100644 index 0000000..d3b4a8c --- /dev/null +++ b/src/Core/SocketOption.cs @@ -0,0 +1,30 @@ +namespace CrossroadsIO +{ + internal enum SocketOption + { + AFFINITY = 4, + IDENTITY = 5, + SUBSCRIBE = 6, + UNSUBSCRIBE = 7, + RATE = 8, + RECOVERY_IVL = 9, + SNDBUF = 11, + RCVBUF = 12, + RCVMORE = 13, + FD = 14, + EVENTS = 15, + TYPE = 16, + LINGER = 17, + RECONNECT_IVL = 18, + BACKLOG = 19, + RECONNECT_IVL_MAX = 21, + MAX_MSG_SIZE = 22, + SNDHWM = 23, + RCVHWM = 24, + MULTICAST_HOPS = 25, + RCVTIMEO = 27, + SNDTIMEO = 28, + IPV4_ONLY = 31, + KEEPALIVE = 32, + } +} \ No newline at end of file diff --git a/src/Core/SocketType.cs b/src/Core/SocketType.cs new file mode 100644 index 0000000..16ee17d --- /dev/null +++ b/src/Core/SocketType.cs @@ -0,0 +1,80 @@ +namespace CrossroadsIO +{ + /// + /// Specifies possible socket types defined by XS messaging patterns. + /// + public enum SocketType + { + /// + /// Can only be connected to a single peer at any one time. + /// Part of the Exclusive Pair pattern. + /// + PAIR = 0, + + /// + /// Used by a publisher to distribute messages in a fan out fashion to all connected peers. + /// Part of the Publish-Subscribe pattern. + /// + PUB = 1, + + /// + /// Used by a subscriber to subscribe to data distributed by a publisher. + /// Part of the Publish-Subscribe pattern. + /// + SUB = 2, + + /// + /// Used by a client to send requests to and receive replies from a service. + /// Part of the Request-Reply pattern. + /// + REQ = 3, + + /// + /// Used by a service to receive requests from and send replies to a client. + /// Part of the Request-Reply pattern. + /// + REP = 4, + + /// + /// Underlying socket type for with no strict ordering rules for sends/receives. + /// Intended for use in intermediate devices in Request-Reply topologies. + /// + XREQ = 5, + + /// + /// Underlying socket type for with no strict ordering rules for sends/receives. + /// Intended for use in intermediate devices in Request-Reply topologies. + /// + XREP = 6, + + /// + /// Used by a pipeline node to receive messages from upstream pipeline nodes. + /// Part of the Pipeline pattern. + /// + PULL = 7, + + /// + /// Used by a pipeline node to send messages to downstream pipeline nodes. + /// Part of the Pipeline pattern. + /// + PUSH = 8, + + /// + /// Same as except subscriptions can be received from peers as incoming messages. + /// Part of the Publish-Subscribe pattern. + /// + /// + /// Subscription message is a byte '1' (for subscriptions) or byte '0' (for unsubscriptions) followed by the subscription body. + /// + XPUB = 9, + + /// + /// Same as except subscription messages can be sent to the publisher. + /// Part of the Publish-Subscribe pattern. + /// + /// + /// Subscription message is a byte '1' (for subscriptions) or byte '0' (for unsubscriptions) followed by the subscription body. + /// + XSUB = 10, + } +} \ No newline at end of file diff --git a/src/Core/SubscribeExtSocket.cs b/src/Core/SubscribeExtSocket.cs new file mode 100644 index 0000000..13d63d4 --- /dev/null +++ b/src/Core/SubscribeExtSocket.cs @@ -0,0 +1,47 @@ +namespace CrossroadsIO +{ + using System; + + using CrossroadsIO.Interop; + + internal class SubscribeExtSocket : Socket + { + public const byte SubscribePrefix = 1; + public const byte UnsubscribePrefix = 0; + + internal SubscribeExtSocket(SocketProxy socketProxy, SocketType socketType) + : base(socketProxy, socketType) + { + } + + public override void Subscribe(byte[] prefix) + { + if (prefix == null) + { + throw new ArgumentNullException("prefix"); + } + + SendWithPrefix(prefix, SubscribePrefix); + } + + public override void Unsubscribe(byte[] prefix) + { + if (prefix == null) + { + throw new ArgumentNullException("prefix"); + } + + SendWithPrefix(prefix, UnsubscribePrefix); + } + + private void SendWithPrefix(byte[] buffer, byte prefix) + { + var prefixedBuffer = new byte[buffer.Length + 1]; + + prefixedBuffer[0] = prefix; + buffer.CopyTo(prefixedBuffer, 1); + + Send(prefixedBuffer, prefixedBuffer.Length, SocketFlags.None); + } + } +} diff --git a/src/Core/SubscribeSocket.cs b/src/Core/SubscribeSocket.cs new file mode 100644 index 0000000..733c0ab --- /dev/null +++ b/src/Core/SubscribeSocket.cs @@ -0,0 +1,19 @@ +namespace CrossroadsIO +{ + using System; + + using CrossroadsIO.Interop; + + internal class SubscribeSocket : Socket + { + internal SubscribeSocket(SocketProxy socketProxy, SocketType socketType) + : base(socketProxy, socketType) + { + } + + public override int Send(byte[] buffer, int size, SocketFlags flags) + { + throw new NotSupportedException(); + } + } +} diff --git a/src/Core/XsException.cs b/src/Core/XsException.cs new file mode 100644 index 0000000..21e5012 --- /dev/null +++ b/src/Core/XsException.cs @@ -0,0 +1,81 @@ +namespace CrossroadsIO +{ + using System; + using System.Runtime.Serialization; + + using CrossroadsIO.Interop; + + /// + /// An exception thrown by the result of a Crossroads I/O library call. + /// + [Serializable] + public class XsException : Exception + { + /// + /// Initializes a new instance of the class. + /// + /// The error code returned by the Crossroads I/O library call. + public XsException(int errorCode) + { + ErrorCode = errorCode; + ErrorName = GetErrorName(errorCode); + } + + /// + /// Initializes a new instance of the class. + /// + /// The error code returned by the Crossroads I/O library call. + /// The message that describes the error + public XsException(int errorCode, string message) + : base(message) + { + ErrorCode = errorCode; + ErrorName = GetErrorName(errorCode); + } + + /// + /// Initializes a new instance of the class. + /// + /// The error code returned by the Crossroads I/O library call. + /// The message that describes the error + /// The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified. + public XsException(int errorCode, string message, Exception inner) + : base(message, inner) + { + ErrorCode = errorCode; + ErrorName = GetErrorName(errorCode); + } + + internal XsException(ErrorDetails errorDetails) + : this(errorDetails.ErrorCode, errorDetails.Message) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// that holds the serialized object data about the exception being thrown. + /// that contains contextual information about the source or destination. + protected XsException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + + /// + /// Gets the error code returned by the Crossroads I/O library call. + /// + public int ErrorCode { get; private set; } + + /// + /// Gets the string representation of the error code, as found in the Crossroads I/O docs. + /// + public string ErrorName { get; private set; } + + private static string GetErrorName(int errorCode) + { + return CrossroadsIO.ErrorCode.ErrorNames.ContainsKey(errorCode) + ? CrossroadsIO.ErrorCode.ErrorNames[errorCode] + : "Error " + errorCode; + } + } +} diff --git a/src/Core/XsSocketException.cs b/src/Core/XsSocketException.cs new file mode 100644 index 0000000..a8bd684 --- /dev/null +++ b/src/Core/XsSocketException.cs @@ -0,0 +1,59 @@ +namespace CrossroadsIO +{ + using System; + using System.Runtime.Serialization; + + using CrossroadsIO.Interop; + + /// + /// The exception that is thrown when a Crossroads I/O socket error occurs. + /// + [Serializable] + public class XsSocketException : XsException + { + /// + /// Initializes a new instance of the class. + /// + /// The error code returned by the Crossroads I/O library call. + public XsSocketException(int errorCode) + : base(errorCode) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The error code returned by the Crossroads I/O library call. + /// The message that describes the error + public XsSocketException(int errorCode, string message) + : base(errorCode, message) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The error code returned by the Crossroads I/O library call. + /// The message that describes the error + /// The exception that is the cause of the current exception, or a null reference if no inner exception is specified. + public XsSocketException(int errorCode, string message, Exception inner) + : base(errorCode, message, inner) + { + } + + internal XsSocketException(ErrorDetails errorDetails) + : base(errorDetails) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// that holds the serialized object data about the exception being thrown. + /// that contains contextual information about the source or destination. + protected XsSocketException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } +} diff --git a/src/Core/XsVersion.cs b/src/Core/XsVersion.cs new file mode 100644 index 0000000..3fdf0bd --- /dev/null +++ b/src/Core/XsVersion.cs @@ -0,0 +1,133 @@ +namespace CrossroadsIO +{ + using System; + + using CrossroadsIO.Interop; + + /// + /// Provides Crossroads I/O version information. + /// + public class XsVersion + { + private static readonly Lazy CurrentVersion; + + static XsVersion() + { + CurrentVersion = new Lazy(GetCurrentVersion); + } + + private XsVersion(int major, int minor, int patch) + { + Major = major; + Minor = minor; + Patch = patch; + } + + /// + /// Gets a value for the current library version. + /// + public static XsVersion Current + { + get { return CurrentVersion.Value; } + } + + /// + /// Gets the major version part. + /// + public int Major { get; private set; } + + /// + /// Gets the minor version part. + /// + public int Minor { get; private set; } + + /// + /// Gets the patch version part. + /// + public int Patch { get; private set; } + + /// + /// Determine whether the current version of Crossroads I/O meets the specified minimum required version. + /// + /// An containing the minimum required major version. + /// true if the current Crossroads I/O version meets the minimum requirement; false otherwise. + public bool IsAtLeast(int requiredMajor) + { + return IsAtLeast(requiredMajor, 0); + } + + /// + /// Determine whether the current version of Crossroads I/O meets the specified minimum required version. + /// + /// An containing the minimum required major version. + /// An containing the minimum required minor version. + /// true if the current Crossroads I/O version meets the minimum requirement; false otherwise. + public bool IsAtLeast(int requiredMajor, int requiredMinor) + { + return Major >= requiredMajor && Minor >= requiredMinor; + } + + /// + /// Determine whether the current version of Crossroads I/O meets the specified maximum allowable version. + /// + /// An containing the maximum allowable major version. + /// true if the current Crossroads I/O version meets the maximum allowed; false otherwise. + public bool IsAtMost(int requiredMajor) + { + return IsAtMost(requiredMajor, int.MaxValue); + } + + /// + /// Determine whether the current version of Crossroads I/O meets the specified maximum allowable version. + /// + /// An containing the maximum allowable major version. + /// An containing the maximum allowable minor version. + /// true if the current Crossroads I/O version meets the maximum allowed; false otherwise. + public bool IsAtMost(int requiredMajor, int requiredMinor) + { + return Major <= requiredMajor && Minor <= requiredMinor; + } + + /// + /// Assert that the current version of Crossroads I/O meets the specified minimum required version. + /// + /// An containing the minimum required major version. + /// An containing the minimum required minor version. + /// The Crossroads I/O version does not meet the minimum requirements. + public void AssertAtLeast(int requiredMajor, int requiredMinor) + { + if (!IsAtLeast(requiredMajor, requiredMinor)) + { + throw new XsVersionException(Major, Minor, requiredMajor, requiredMinor); + } + } + + /// + /// Assert that the current version of Crossroads I/O meets the specified maximum allowed version. + /// + /// An containing the maximum allowable major version. + /// An containing the maximum allowable minor version. + /// The Crossroads I/O version does not meet the minimum requirements. + public void AssertAtMost(int requiredMajor, int requiredMinor) + { + if (!IsAtMost(requiredMajor, requiredMinor)) + { + throw new XsVersionException(Major, Minor, requiredMajor, requiredMinor); + } + } + + /// + /// Returns a that represents the current . + /// + /// A string containing the current Crossroads I/O version, formatted as "major.minor.patch". + public override string ToString() + { + return Major + "." + Minor + "." + Patch; + } + + private static XsVersion GetCurrentVersion() + { + return new XsVersion(LibXs.MajorVersion, LibXs.MinorVersion, LibXs.PatchVersion); + } + } +} diff --git a/src/Core/XsVersionException.cs b/src/Core/XsVersionException.cs new file mode 100644 index 0000000..379e448 --- /dev/null +++ b/src/Core/XsVersionException.cs @@ -0,0 +1,60 @@ +namespace CrossroadsIO +{ + using System; + using System.Runtime.Serialization; + + /// + /// The exception that is thrown when the current Crossroads I/O version does not meet application requirements. + /// + [Serializable] + public class XsVersionException : Exception + { + /// + /// Initializes a new instance of the class. + /// + public XsVersionException() + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The current major XS version. + /// The current minor XS version. + /// The required major XS version. + /// The required minor XS version. + public XsVersionException(int major, int minor, int requiredMajor, int requiredMinor) + : base(string.Format("Invalid CrossroadsIO version. Current: {0}.{1}; required: {2}.{3}", major, minor, requiredMajor, requiredMinor)) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The message that describes the error + public XsVersionException(string message) + : base(message) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The message that describes the error + /// The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified. + public XsVersionException(string message, Exception inner) + : base(message, inner) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// that holds the serialized object data about the exception being thrown. + /// that contains contextual information about the source or destination. + protected XsVersionException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } +}