28 changes: 14 additions & 14 deletions src/NetMQ.Tests/SocketTests.cs
Expand Up @@ -126,7 +126,7 @@ public void LargeMessageLittleEndian()
{
pub.Options.Endian = Endianness.Little;
sub.Options.Endian = Endianness.Little;

var port = pub.BindRandomPort("tcp://127.0.0.1");
sub.Connect("tcp://127.0.0.1:" + port);

Expand Down Expand Up @@ -274,7 +274,7 @@ public void BindRandom()
public void BindToLocal()
{
var validAliasesForLocalHost = new[] { "127.0.0.1", "localhost", Dns.GetHostName() };

foreach (var alias in validAliasesForLocalHost)
{
using (var context = NetMQContext.Create())
Expand Down Expand Up @@ -444,7 +444,7 @@ public void Disconnect(string protocol)
client.Send("1");
client.Send("2");

// make sure client is connected to both servers
// make sure client is connected to both servers
server1.SkipFrame();
server2.SkipFrame();

Expand Down Expand Up @@ -556,10 +556,10 @@ public void ASubscriberSocketThatGetDisconnectedBlocksItsContextFromBeingDispose
pub.Options.SendTimeout = TimeSpan.FromSeconds(2);

sub.Options.Linger = TimeSpan.FromSeconds(0);

sub.Connect("tcp://localhost:12345");
sub.Subscribe("");

// Thread.Sleep(1000);

pub.Bind("tcp://localhost:12345");
Expand All @@ -570,7 +570,7 @@ public void ASubscriberSocketThatGetDisconnectedBlocksItsContextFromBeingDispose
for (var i = 0; i < 100; i++)
{
var sent = "msg-" + i;

pub.Send(sent);

string received;
Expand All @@ -582,7 +582,7 @@ public void ASubscriberSocketThatGetDisconnectedBlocksItsContextFromBeingDispose

// Thread.Sleep(1000);
}
}
}

[Test]
public void BindRandomThenUnbind()
Expand All @@ -594,15 +594,15 @@ public void BindRandomThenUnbind()

pub.Unbind("tcp://localhost:" + port);
}

using (var context = NetMQContext.Create())
using (var pub = context.CreatePublisherSocket())
{
var port = pub.BindRandomPort("tcp://*");

pub.Unbind("tcp://*:" + port);
}

using (var context = NetMQContext.Create())
using (var pub = context.CreatePublisherSocket())
{
Expand All @@ -615,7 +615,7 @@ public void BindRandomThenUnbind()
pub.Unbind("tcp://*:" + port3);
}
}

[Test]
public void InprocRouterDealerTest()
{
Expand All @@ -637,7 +637,7 @@ public void InprocRouterDealerTest()
// Handle worker activity on backend
while (e.Socket.HasIn)
{
var msg = e.Socket.ReceiveMessage(false);
var msg = e.Socket.ReceiveMultipartMessage();
var idRouter = msg.Pop();
// forget the empty frame
if (msg.First.IsEmpty)
Expand All @@ -651,7 +651,7 @@ public void InprocRouterDealerTest()
{
// worker send RDY message queue his Identity to the free workers queue
if (s_ReadyMsg[0] ==msg[0].Buffer[0] &&
s_ReadyMsg[1] ==msg[0].Buffer[1] &&
s_ReadyMsg[1] ==msg[0].Buffer[1] &&
s_ReadyMsg[2] ==msg[0].Buffer[2])
{
lock (s_FreeWorkers)
Expand All @@ -676,7 +676,7 @@ public void InprocRouterDealerTest()
{
workerSocket.Options.Identity = workerId;
workerSocket.Connect("inproc://backend");
var workerReadyMsg = new NetMQMessage();
workerReadyMsg.Append(workerId);
workerReadyMsg.AppendEmptyFrame();
Expand All @@ -690,7 +690,7 @@ public void InprocRouterDealerTest()
workerThread.Start(backendsRouter.Options.Identity);
workers.Add(workerThread);
}

poller.PollTillCancelledNonBlocking();
Thread.Sleep(1000);
poller.CancelAndJoin();
Expand Down
20 changes: 10 additions & 10 deletions src/NetMQ/Core/MonitorEvent.cs
Expand Up @@ -11,7 +11,7 @@ internal class MonitorEvent
private const int ValueInteger = 1;
private const int ValueChannel = 2;

private readonly SocketEvent m_monitorEvent;
private readonly SocketEvents m_monitorEvent;
private readonly string m_addr;
[CanBeNull] private readonly Object m_arg;
private readonly int m_flag;
Expand All @@ -26,19 +26,19 @@ static MonitorEvent()
s_sizeOfIntPtr = 8;
}

public MonitorEvent(SocketEvent monitorEvent, [NotNull] string addr, ErrorCode arg)
public MonitorEvent(SocketEvents monitorEvent, [NotNull] string addr, ErrorCode arg)
: this(monitorEvent, addr, (int)arg)
{}

public MonitorEvent(SocketEvent monitorEvent, [NotNull] string addr, int arg)
public MonitorEvent(SocketEvents monitorEvent, [NotNull] string addr, int arg)
: this(monitorEvent, addr, (object)arg)
{}

public MonitorEvent(SocketEvent monitorEvent, [NotNull] string addr, AsyncSocket arg)
public MonitorEvent(SocketEvents monitorEvent, [NotNull] string addr, AsyncSocket arg)
: this(monitorEvent, addr, (object)arg)
{}

private MonitorEvent(SocketEvent monitorEvent, [NotNull] string addr, [NotNull] Object arg)
private MonitorEvent(SocketEvents monitorEvent, [NotNull] string addr, [NotNull] Object arg)
{
m_monitorEvent = monitorEvent;
m_addr = addr;
Expand All @@ -64,15 +64,15 @@ public object Arg
get { return m_arg; }
}

public SocketEvent Event
public SocketEvents Event
{
get { return m_monitorEvent; }
}

public void Write([NotNull] SocketBase s)
{
int size = 4 + 1 + m_addr.Length + 1; // event + len(addr) + addr + flag

if (m_flag == ValueInteger)
size += 4;
else if (m_flag == ValueChannel)
Expand Down Expand Up @@ -121,7 +121,7 @@ public static MonitorEvent Read([NotNull] SocketBase s)
int pos = 0;
ByteArraySegment data = msg.Data;

var @event = (SocketEvent)data.GetInteger(Endianness.Little, pos);
var @event = (SocketEvents)data.GetInteger(Endianness.Little, pos);
pos += 4;
var len = (int)data[pos++];
string addr = data.GetString(len, pos);
Expand All @@ -135,8 +135,8 @@ public static MonitorEvent Read([NotNull] SocketBase s)
}
else if (flag == ValueChannel)
{
IntPtr value = s_sizeOfIntPtr == 4
? new IntPtr(data.GetInteger(Endianness.Little, pos))
IntPtr value = s_sizeOfIntPtr == 4
? new IntPtr(data.GetInteger(Endianness.Little, pos))
: new IntPtr(data.GetLong(Endianness.Little, pos));

GCHandle handle = GCHandle.FromIntPtr(value);
Expand Down
44 changes: 22 additions & 22 deletions src/NetMQ/Core/SocketBase.cs
Expand Up @@ -79,7 +79,7 @@ internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents
private SocketBase m_monitorSocket;

/// <summary>Bitmask of events being monitored.</summary>
private SocketEvent m_monitorEvents;
private SocketEvents m_monitorEvents;

/// <summary>The tcp port that was bound to, if any.</summary>
private int m_port;
Expand Down Expand Up @@ -1376,7 +1376,7 @@ private void ExtractFlags(ref Msg msg)
/// <exception cref="NetMQException">Maximum number of sockets reached.</exception>
/// <exception cref="ProtocolNotSupportedException">The protocol of <paramref name="addr"/> is not supported.</exception>
/// <exception cref="TerminatingException">The socket has been stopped.</exception>
public void Monitor([CanBeNull] string addr, SocketEvent events)
public void Monitor([CanBeNull] string addr, SocketEvents events)
{
CheckContextTerminated();

Expand Down Expand Up @@ -1431,82 +1431,82 @@ public void Monitor([CanBeNull] string addr, SocketEvent events)

public void EventConnected([NotNull] string addr, [NotNull] AsyncSocket ch)
{
if ((m_monitorEvents & SocketEvent.Connected) == 0)
if ((m_monitorEvents & SocketEvents.Connected) == 0)
return;

MonitorEvent(new MonitorEvent(SocketEvent.Connected, addr, ch));
MonitorEvent(new MonitorEvent(SocketEvents.Connected, addr, ch));
}

public void EventConnectDelayed([NotNull] string addr, ErrorCode errno)
{
if ((m_monitorEvents & SocketEvent.ConnectDelayed) == 0)
if ((m_monitorEvents & SocketEvents.ConnectDelayed) == 0)
return;

MonitorEvent(new MonitorEvent(SocketEvent.ConnectDelayed, addr, errno));
MonitorEvent(new MonitorEvent(SocketEvents.ConnectDelayed, addr, errno));
}

public void EventConnectRetried([NotNull] string addr, int interval)
{
if ((m_monitorEvents & SocketEvent.ConnectRetried) == 0)
if ((m_monitorEvents & SocketEvents.ConnectRetried) == 0)
return;

MonitorEvent(new MonitorEvent(SocketEvent.ConnectRetried, addr, interval));
MonitorEvent(new MonitorEvent(SocketEvents.ConnectRetried, addr, interval));
}

public void EventListening([NotNull] string addr, [NotNull] AsyncSocket ch)
{
if ((m_monitorEvents & SocketEvent.Listening) == 0)
if ((m_monitorEvents & SocketEvents.Listening) == 0)
return;

MonitorEvent(new MonitorEvent(SocketEvent.Listening, addr, ch));
MonitorEvent(new MonitorEvent(SocketEvents.Listening, addr, ch));
}

public void EventBindFailed([NotNull] string addr, ErrorCode errno)
{
if ((m_monitorEvents & SocketEvent.BindFailed) == 0)
if ((m_monitorEvents & SocketEvents.BindFailed) == 0)
return;

MonitorEvent(new MonitorEvent(SocketEvent.BindFailed, addr, errno));
MonitorEvent(new MonitorEvent(SocketEvents.BindFailed, addr, errno));
}

public void EventAccepted([NotNull] string addr, [NotNull] AsyncSocket ch)
{
if ((m_monitorEvents & SocketEvent.Accepted) == 0)
if ((m_monitorEvents & SocketEvents.Accepted) == 0)
return;

MonitorEvent(new MonitorEvent(SocketEvent.Accepted, addr, ch));
MonitorEvent(new MonitorEvent(SocketEvents.Accepted, addr, ch));
}

public void EventAcceptFailed([NotNull] string addr, ErrorCode errno)
{
if ((m_monitorEvents & SocketEvent.AcceptFailed) == 0)
if ((m_monitorEvents & SocketEvents.AcceptFailed) == 0)
return;

MonitorEvent(new MonitorEvent(SocketEvent.AcceptFailed, addr, errno));
MonitorEvent(new MonitorEvent(SocketEvents.AcceptFailed, addr, errno));
}

public void EventClosed([NotNull] string addr, [NotNull] AsyncSocket ch)
{
if ((m_monitorEvents & SocketEvent.Closed) == 0)
if ((m_monitorEvents & SocketEvents.Closed) == 0)
return;

MonitorEvent(new MonitorEvent(SocketEvent.Closed, addr, ch));
MonitorEvent(new MonitorEvent(SocketEvents.Closed, addr, ch));
}

public void EventCloseFailed([NotNull] string addr, ErrorCode errno)
{
if ((m_monitorEvents & SocketEvent.CloseFailed) == 0)
if ((m_monitorEvents & SocketEvents.CloseFailed) == 0)
return;

MonitorEvent(new MonitorEvent(SocketEvent.CloseFailed, addr, errno));
MonitorEvent(new MonitorEvent(SocketEvents.CloseFailed, addr, errno));
}

public void EventDisconnected([NotNull] string addr, [NotNull] AsyncSocket ch)
{
if ((m_monitorEvents & SocketEvent.Disconnected) == 0)
if ((m_monitorEvents & SocketEvents.Disconnected) == 0)
return;

MonitorEvent(new MonitorEvent(SocketEvent.Disconnected, addr, ch));
MonitorEvent(new MonitorEvent(SocketEvents.Disconnected, addr, ch));
}

private void MonitorEvent([NotNull] MonitorEvent monitorEvent)
Expand Down
214 changes: 132 additions & 82 deletions src/NetMQ/Monitoring/NetMQMonitor.cs
@@ -1,54 +1,104 @@
using System;
using System.Threading;
#if !NET35
using System.Threading.Tasks;
#endif
using AsyncIO;
using JetBrains.Annotations;
using NetMQ.Core;

namespace NetMQ.Monitoring
{
/// <summary>
/// Use this class when you want to monitor a socket.
/// Monitors a <see cref="NetMQSocket"/> for events, raising them via events.
/// </summary>
/// <remarks>
/// To run a monitor instance, either:
/// <list type="bullet">
/// <item>Call <see cref="Start"/> (blocking) and <see cref="Stop"/>, or</item>
/// <item>Call <see cref="AttachToPoller"/> and <see cref="DetachFromPoller"/>.</item>
/// </list>
/// </remarks>
public class NetMQMonitor : IDisposable
{
private readonly bool m_isOwner;
private Poller m_attachedPoller;

[NotNull] private readonly NetMQSocket m_monitoringSocket;
private readonly bool m_ownsMonitoringSocket;
[CanBeNull] private Poller m_attachedPoller;
private int m_cancel;

private readonly ManualResetEvent m_isStoppedEvent = new ManualResetEvent(true);

[Obsolete("Use overload that takes SocketEvents (plural)")]
public NetMQMonitor([NotNull] NetMQContext context, [NotNull] NetMQSocket monitoredSocket, [NotNull] string endpoint, SocketEvent eventsToMonitor)
: this(context, monitoredSocket, endpoint, (SocketEvents)eventsToMonitor)
{}

public NetMQMonitor([NotNull] NetMQContext context, [NotNull] NetMQSocket monitoredSocket, [NotNull] string endpoint, SocketEvents eventsToMonitor)
{
Endpoint = endpoint;
Timeout = TimeSpan.FromSeconds(0.5);

monitoredSocket.Monitor(endpoint, eventsToMonitor);

MonitoringSocket = context.CreatePairSocket();
MonitoringSocket.Options.Linger = TimeSpan.Zero;
m_monitoringSocket = context.CreatePairSocket();
m_monitoringSocket.Options.Linger = TimeSpan.Zero;
m_monitoringSocket.ReceiveReady += Handle;

MonitoringSocket.ReceiveReady += Handle;

m_isOwner = true;
m_ownsMonitoringSocket = true;
}

/// <summary>
/// This constructor receives an already-created monitored socket. The other constructor is preferred; this one is here to support clrzmq signature
/// Initialises a monitor on <paramref name="socket"/> for a specified <paramref name="endpoint"/>.
/// </summary>
/// <param name="socket">this will be the monitoring socket</param>
/// <remarks>
/// This constructor matches the signature used by clrzmq.
/// </remarks>
/// <param name="socket">The socket to monitor.</param>
/// <param name="endpoint">a string denoting the endpoint which will be the monitoring address</param>
public NetMQMonitor([NotNull] NetMQSocket socket, [NotNull] string endpoint)
/// <param name="ownsSocket">
/// A flag indicating whether ownership of <paramref name="socket"/> is transferred to the monitor.
/// If <c>true</c>, disposing the monitor will also dispose <paramref name="socket"/>.
/// </param>
public NetMQMonitor([NotNull] NetMQSocket socket, [NotNull] string endpoint, bool ownsSocket = false)
{
Endpoint = endpoint;
Timeout = TimeSpan.FromSeconds(0.5);
MonitoringSocket = socket;

MonitoringSocket.ReceiveReady += Handle;
m_monitoringSocket = socket;
m_monitoringSocket.ReceiveReady += Handle;

m_isOwner = false;
m_ownsMonitoringSocket = ownsSocket;
}

/// <summary>
/// The monitoring address.
/// </summary>
public string Endpoint { get; private set; }

/// <summary>
/// Get whether this monitor is currently running.
/// </summary>
/// <remarks>
/// Start the monitor running via either <see cref="Start"/> or <see cref="AttachToPoller"/>.
/// Stop the monitor via either <see cref="Stop"/> or <see cref="DetachFromPoller"/>.
/// </remarks>
public bool IsRunning { get; private set; }

/// <summary>
/// Gets and sets the timeout interval for poll iterations when using <see cref="Start"/> and <see cref="Stop"/>.
/// </summary>
/// <remarks>
/// The higher the number the longer it may take the to stop the monitor.
/// This value has no effect when the monitor is run via <see cref="AttachToPoller"/>.
/// </remarks>
public TimeSpan Timeout { get; set; }

#region Events

/// <summary>
/// Raised whenever any monitored event fires.
/// </summary>
public event EventHandler<NetMQMonitorEventArgs> EventReceived;

/// <summary>
/// Occurs when a connection is made to a socket.
/// </summary>
Expand Down Expand Up @@ -85,7 +135,7 @@ public NetMQMonitor([NotNull] NetMQSocket socket, [NotNull] string endpoint)
public event EventHandler<NetMQMonitorErrorEventArgs> AcceptFailed;

/// <summary>
/// Occurs when a connection was closed.
/// Occurs when a connection was closed.
/// </summary>
public event EventHandler<NetMQMonitorSocketEventArgs> Closed;

Expand All @@ -95,93 +145,76 @@ public NetMQMonitor([NotNull] NetMQSocket socket, [NotNull] string endpoint)
public event EventHandler<NetMQMonitorErrorEventArgs> CloseFailed;

/// <summary>
/// Occurs when the stream engine (tcp and ipc specific) detects a corrupted / broken session.
/// Occurs when the stream engine (TCP and IPC specific) detects a corrupted / broken session.
/// </summary>
public event EventHandler<NetMQMonitorSocketEventArgs> Disconnected;

/// <summary>
/// The monitoring address
/// </summary>
public string Endpoint { get; private set; }

/// <summary>
/// Monitoring socket created by the init method
/// </summary>
internal NetMQSocket MonitoringSocket { get; private set; }

/// <summary>
/// Get whether this monitor is currently running.
/// This is set within Start and AttachToPoller, and cleared within DetachFromPoller.
/// </summary>
public bool IsRunning { get; private set; }

/// <summary>
/// How much time to wait on each poll iteration, the higher the number the longer it will take the poller to stop
/// </summary>
public TimeSpan Timeout { get; set; }
#endregion

internal void Handle(object sender, NetMQSocketEventArgs socketEventArgs)
private void Handle(object sender, NetMQSocketEventArgs socketEventArgs)
{
MonitorEvent monitorEvent = MonitorEvent.Read(MonitoringSocket.SocketHandle);
var monitorEvent = MonitorEvent.Read(m_monitoringSocket.SocketHandle);

switch (monitorEvent.Event)
{
case SocketEvent.Connected:
InvokeEvent(Connected, new NetMQMonitorSocketEventArgs(this, monitorEvent.Addr, (AsyncSocket)monitorEvent.Arg));
case SocketEvents.Connected:
InvokeEvent(Connected, new NetMQMonitorSocketEventArgs(this, monitorEvent.Addr, (AsyncSocket)monitorEvent.Arg, SocketEvents.Connected));
break;
case SocketEvent.ConnectDelayed:
InvokeEvent(ConnectDelayed, new NetMQMonitorErrorEventArgs(this, monitorEvent.Addr, (ErrorCode)monitorEvent.Arg));
case SocketEvents.ConnectDelayed:
InvokeEvent(ConnectDelayed, new NetMQMonitorErrorEventArgs(this, monitorEvent.Addr, (ErrorCode)monitorEvent.Arg, SocketEvents.ConnectDelayed));
break;
case SocketEvent.ConnectRetried:
InvokeEvent(ConnectRetried, new NetMQMonitorIntervalEventArgs(this, monitorEvent.Addr, (int)monitorEvent.Arg));
case SocketEvents.ConnectRetried:
InvokeEvent(ConnectRetried, new NetMQMonitorIntervalEventArgs(this, monitorEvent.Addr, (int)monitorEvent.Arg, SocketEvents.ConnectRetried));
break;
case SocketEvent.Listening:
InvokeEvent(Listening, new NetMQMonitorSocketEventArgs(this, monitorEvent.Addr, (AsyncSocket)monitorEvent.Arg));
case SocketEvents.Listening:
InvokeEvent(Listening, new NetMQMonitorSocketEventArgs(this, monitorEvent.Addr, (AsyncSocket)monitorEvent.Arg, SocketEvents.Listening));
break;
case SocketEvent.BindFailed:
InvokeEvent(BindFailed, new NetMQMonitorErrorEventArgs(this, monitorEvent.Addr, (ErrorCode)monitorEvent.Arg));
case SocketEvents.BindFailed:
InvokeEvent(BindFailed, new NetMQMonitorErrorEventArgs(this, monitorEvent.Addr, (ErrorCode)monitorEvent.Arg, SocketEvents.BindFailed));
break;
case SocketEvent.Accepted:
InvokeEvent(Accepted, new NetMQMonitorSocketEventArgs(this, monitorEvent.Addr, (AsyncSocket)monitorEvent.Arg));
case SocketEvents.Accepted:
InvokeEvent(Accepted, new NetMQMonitorSocketEventArgs(this, monitorEvent.Addr, (AsyncSocket)monitorEvent.Arg, SocketEvents.Accepted));
break;
case SocketEvent.AcceptFailed:
InvokeEvent(AcceptFailed, new NetMQMonitorErrorEventArgs(this, monitorEvent.Addr, (ErrorCode)monitorEvent.Arg));
case SocketEvents.AcceptFailed:
InvokeEvent(AcceptFailed, new NetMQMonitorErrorEventArgs(this, monitorEvent.Addr, (ErrorCode)monitorEvent.Arg, SocketEvents.AcceptFailed));
break;
case SocketEvent.Closed:
InvokeEvent(Closed, new NetMQMonitorSocketEventArgs(this, monitorEvent.Addr, (AsyncSocket)monitorEvent.Arg));
case SocketEvents.Closed:
InvokeEvent(Closed, new NetMQMonitorSocketEventArgs(this, monitorEvent.Addr, (AsyncSocket)monitorEvent.Arg, SocketEvents.Closed));
break;
case SocketEvent.CloseFailed:
InvokeEvent(CloseFailed, new NetMQMonitorErrorEventArgs(this, monitorEvent.Addr, (ErrorCode)monitorEvent.Arg));
case SocketEvents.CloseFailed:
InvokeEvent(CloseFailed, new NetMQMonitorErrorEventArgs(this, monitorEvent.Addr, (ErrorCode)monitorEvent.Arg, SocketEvents.CloseFailed));
break;
case SocketEvent.Disconnected:
InvokeEvent(Disconnected, new NetMQMonitorSocketEventArgs(this, monitorEvent.Addr, (AsyncSocket)monitorEvent.Arg));
case SocketEvents.Disconnected:
InvokeEvent(Disconnected, new NetMQMonitorSocketEventArgs(this, monitorEvent.Addr, (AsyncSocket)monitorEvent.Arg, SocketEvents.Disconnected));
break;
default:
throw new Exception("unknown event " + monitorEvent.Event.ToString());
throw new Exception("unknown event " + monitorEvent.Event);
}
}

private void InvokeEvent<T>(EventHandler<T> handler, T args) where T : NetMQMonitorEventArgs
{
var temp = handler;
if (temp != null)
{
temp(this, args);
}
var t1 = EventReceived;
if (t1 != null)
t1(this, args);

var t2 = handler;
if (t2 != null)
t2(this, args);
}

private void InternalStart()
{
m_isStoppedEvent.Reset();
IsRunning = true;
MonitoringSocket.Connect(Endpoint);
m_monitoringSocket.Connect(Endpoint);
}

private void InternalClose()
{
try
{
MonitoringSocket.Disconnect(Endpoint);
m_monitoringSocket.Disconnect(Endpoint);
}
catch (Exception)
{}
Expand All @@ -196,12 +229,12 @@ public void AttachToPoller([NotNull] Poller poller)
{
InternalStart();
m_attachedPoller = poller;
poller.AddSocket(MonitoringSocket);
poller.AddSocket(m_monitoringSocket);
}

public void DetachFromPoller()
{
m_attachedPoller.RemoveSocket(MonitoringSocket);
m_attachedPoller.RemoveSocket(m_monitoringSocket);
m_attachedPoller = null;
InternalClose();
}
Expand All @@ -216,22 +249,18 @@ public void Start()
Thread.MemoryBarrier();

if (IsRunning)
{
throw new InvalidOperationException("Monitor already started");
}

if (m_attachedPoller != null)
{
throw new InvalidOperationException("Monitor attached to a poller");
}

InternalStart();

try
{
InternalStart();

while (m_cancel == 0)
{
MonitoringSocket.Poll(Timeout);
m_monitoringSocket.Poll(Timeout);
}
}
finally
Expand All @@ -240,21 +269,38 @@ public void Start()
}
}

#if !NET35
/// <summary>
/// Stop the socket monitoring
/// Start a background task for the monitoring operation.
/// </summary>
/// <returns></returns>
public Task StartAsync()
{
if (IsRunning)
throw new InvalidOperationException("Monitor already started");

if (m_attachedPoller != null)
throw new InvalidOperationException("Monitor attached to a poller");

return Task.Factory.StartNew(Start);
}
#endif

/// <summary>
/// Stop monitoring. Blocks until monitoring completed.
/// </summary>
/// <exception cref="InvalidOperationException">If this monitor is attached to a poller you must detach it first and not use the stop method.</exception>
public void Stop()
{
if (m_attachedPoller != null)
{
throw new InvalidOperationException("Monitor attached to a poller, please detach from poller and don't use the stop method");
}

Interlocked.Exchange(ref m_cancel, 1);
m_isStoppedEvent.WaitOne();
}

#region Dispose

/// <summary>
/// Release and dispose of any contained resources.
/// </summary>
Expand Down Expand Up @@ -282,12 +328,16 @@ protected virtual void Dispose(bool disposing)
Stop();
}

m_monitoringSocket.ReceiveReady -= Handle;

m_isStoppedEvent.Close();

if (m_isOwner)
if (m_ownsMonitoringSocket)
{
MonitoringSocket.Dispose();
m_monitoringSocket.Dispose();
}
}

#endregion
}
}
66 changes: 38 additions & 28 deletions src/NetMQ/Monitoring/NetMQMonitorEventArgs.cs
Expand Up @@ -5,100 +5,110 @@
namespace NetMQ.Monitoring
{
/// <summary>
/// This is an EventArgs that also contains a NetMQMonitor and a string Address.
/// Base class for all event arguments raised by <see cref="NetMQMonitor"/>.
/// </summary>
public class NetMQMonitorEventArgs : EventArgs
public abstract class NetMQMonitorEventArgs : EventArgs
{
/// <summary>
/// Create a new NetMQMonitorEventArgs that contains the given monitor and address.
/// </summary>
/// <param name="monitor">a NetMQMonitor for this to hold</param>
/// <param name="address">a string address for this to hold</param>
public NetMQMonitorEventArgs([NotNull] NetMQMonitor monitor, [NotNull] string address)
/// <param name="monitor">The <see cref="NetMQMonitor"/> that raised this event.</param>
/// <param name="address">The address of the event.</param>
/// <param name="socketEvent">The type of socket event that occurred.</param>
protected NetMQMonitorEventArgs([NotNull] NetMQMonitor monitor, [NotNull] string address, SocketEvents socketEvent)
{
Monitor = monitor;
Address = address;
SocketEvent = socketEvent;
}

/// <summary>
/// Get the NetMQMonitor that this NetMQMonitorEventArgs is holding.
/// Gets the <see cref="NetMQMonitor"/> that raised this event.
/// </summary>
[NotNull]
public NetMQMonitor Monitor { get; private set; }

/// <summary>
/// Get the address, as a string, that this NetMQMonitorEventArgs is holding.
/// Gets the address of the event.
/// </summary>
[NotNull]
public string Address { get; private set; }

/// <summary>
/// Gets the type of socket event that occurred.
/// </summary>
public SocketEvents SocketEvent { get; private set; }
}

/// <summary>
/// A NetMQMonitorSocketEventArgs is a subclass of NetMQMonitorEventArgs that also holds a socket.
/// A subclass of <see cref="NetMQMonitorEventArgs"/> that also holds a socket.
/// </summary>
public class NetMQMonitorSocketEventArgs : NetMQMonitorEventArgs
{
/// <summary>
/// Create a new NetMQMonitorSocketEventArgs that contains the given monitor, address, and socket.
/// </summary>
/// <param name="monitor">the NetMQMonitor that this event concerns</param>
/// <param name="address">a string denoting the endpoint-address</param>
/// <param name="socket">the AsyncSocket in question</param>
public NetMQMonitorSocketEventArgs([NotNull] NetMQMonitor monitor, [NotNull] string address, [NotNull] AsyncSocket socket)
: base(monitor, address)
/// <param name="monitor">The <see cref="NetMQMonitor"/> that raised this event.</param>
/// <param name="address">The address of the event.</param>
/// <param name="socketEvent">The type of socket event that occurred.</param>
/// <param name="socket">The socket upon which this event occurred.</param>
public NetMQMonitorSocketEventArgs([NotNull] NetMQMonitor monitor, [NotNull] string address, [NotNull] AsyncSocket socket, SocketEvents socketEvent)
: base(monitor, address, socketEvent)
{
Socket = socket;
}

/// <summary>
/// Get the AsyncSocket that this is holding.
/// Gets the socket upon which this event occurred.
/// </summary>
[NotNull]
public AsyncSocket Socket { get; private set; }
}

/// <summary>
/// A NetMQMonitorErrorEventArgs is a subclass of NetMQMonitorEventArgs that also holds an ErrorCode.
/// A subclass of <see cref="NetMQMonitorEventArgs"/> that also holds an error code.
/// </summary>
public class NetMQMonitorErrorEventArgs : NetMQMonitorEventArgs
{
/// <summary>
/// Create a new NetMQMonitorErrorEventArgs that contains the given monitor, address, and error-code.
/// </summary>
/// <param name="monitor">the NetMQMonitor that this event concerns</param>
/// <param name="address">a string denoting the endpoint-address</param>
/// <param name="errorCode">the ErrorCode that is giving rise to this event</param>
public NetMQMonitorErrorEventArgs([NotNull] NetMQMonitor monitor, [NotNull] string address, ErrorCode errorCode)
: base(monitor, address)
/// <param name="monitor">The <see cref="NetMQMonitor"/> that raised this event.</param>
/// <param name="address">The address of the event.</param>
/// <param name="socketEvent">The type of socket event that occurred.</param>
/// <param name="errorCode">The error code associated with this event.</param>
public NetMQMonitorErrorEventArgs([NotNull] NetMQMonitor monitor, [NotNull] string address, ErrorCode errorCode, SocketEvents socketEvent)
: base(monitor, address, socketEvent)
{
ErrorCode = errorCode;
}

/// <summary>
/// Get the ErrorCode that this was constructed with.
/// Gets the error code associated with this event.
/// </summary>
public ErrorCode ErrorCode { get; private set; }
}

/// <summary>
/// A NetMQMonitorIntervalEventArgs is a subclass of NetMQMonitorEventArgs that also provides an Interval property to hold the reconnect-interval.
/// A subclass of <see cref="NetMQMonitorEventArgs"/> that also holds an interval.
/// </summary>
public class NetMQMonitorIntervalEventArgs : NetMQMonitorEventArgs
{
/// <summary>
/// Create a new NetMQMonitorIntervalEventArgs containing the given NetMQMonitor, address, and time-interval.
/// Create a new NetMQMonitorIntervalEventArgs containing the given NetMQMonitor, address, and interval.
/// </summary>
/// <param name="monitor">the NetMQMonitor</param>
/// <param name="address">a string denoting the address</param>
/// <param name="interval">the computed reconnect-interval</param>
public NetMQMonitorIntervalEventArgs([NotNull] NetMQMonitor monitor, [NotNull] string address, int interval)
: base(monitor, address)
/// <param name="address">The a string denoting the address</param>
/// <param name="interval">The interval, in milliseconds.</param>
/// <param name="socketEvent">The type of socket event that occurred.</param>
public NetMQMonitorIntervalEventArgs([NotNull] NetMQMonitor monitor, [NotNull] string address, int interval, SocketEvents socketEvent)
: base(monitor, address, socketEvent)
{
Interval = interval;
}

/// <summary>
/// Gets the computed reconnect interval.
/// Gets the interval, in milliseconds.
/// </summary>
public int Interval { get; private set; }
}
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ/NetMQ.csproj
Expand Up @@ -127,7 +127,7 @@
<Compile Include="Core\Address.cs" />
<Compile Include="Endianness.cs" />
<Compile Include="SendReceiveOptions.cs" />
<Compile Include="SocketEvent.cs" />
<Compile Include="SocketEvents.cs" />
<Compile Include="PollEvents.cs" />
<Compile Include="Core\Utils\AtomicCounter.cs" />
<Compile Include="Blob.cs" />
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ/NetMQ3.5.csproj
Expand Up @@ -159,7 +159,7 @@
<Compile Include="PollEvents.cs" />
<Compile Include="ReceivingSocketExtensions.cs" />
<Compile Include="SendReceiveOptions.cs" />
<Compile Include="SocketEvent.cs" />
<Compile Include="SocketEvents.cs" />
<Compile Include="Sockets\DealerSocket.cs" />
<Compile Include="Monitoring\NetMQMonitor.cs" />
<Compile Include="NetMQException.cs" />
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ/NetMQContext.cs
Expand Up @@ -261,7 +261,7 @@ public NetMQMonitor CreateMonitorSocket([NotNull] string endpoint)
throw new ArgumentException("Unable to monitor to an empty endpoint.", "endpoint");
}

return new NetMQMonitor(CreatePairSocket(), endpoint);
return new NetMQMonitor(CreatePairSocket(), endpoint, ownsSocket: true);
}

#endregion
Expand Down
17 changes: 17 additions & 0 deletions src/NetMQ/NetMQSocket.cs
Expand Up @@ -442,7 +442,24 @@ public virtual void Unsubscribe(byte[] topic)
/// <exception cref="ProtocolNotSupportedException">The protocol of <paramref name="endpoint"/> is not supported.</exception>
/// <exception cref="TerminatingException">The socket has been stopped.</exception>
/// <exception cref="NetMQException">Maximum number of sockets reached.</exception>
[Obsolete("Use overload that accepts SocketEvents (plural) instead")]
public void Monitor([NotNull] string endpoint, SocketEvent events = SocketEvent.All)
{
Monitor(endpoint, (SocketEvents)events);
}

/// <summary>
/// Listen to the given endpoint for SocketEvent events.
/// </summary>
/// <param name="endpoint">A string denoting the endpoint to monitor</param>
/// <param name="events">The specific <see cref="SocketEvent"/> events to report on. Defaults to <see cref="SocketEvent.All"/> if ommitted.</param>
/// <exception cref="ArgumentNullException"><paramref name="endpoint"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException"><paramref name="endpoint"/> cannot be empty or whitespace.</exception>
/// <exception cref="ObjectDisposedException">This object is already disposed.</exception>
/// <exception cref="ProtocolNotSupportedException">The protocol of <paramref name="endpoint"/> is not supported.</exception>
/// <exception cref="TerminatingException">The socket has been stopped.</exception>
/// <exception cref="NetMQException">Maximum number of sockets reached.</exception>
public void Monitor([NotNull] string endpoint, SocketEvents events = SocketEvents.All)
{
if (endpoint == null)
throw new ArgumentNullException("endpoint");
Expand Down
31 changes: 0 additions & 31 deletions src/NetMQ/SocketEvent.cs

This file was deleted.

59 changes: 59 additions & 0 deletions src/NetMQ/SocketEvents.cs
@@ -0,0 +1,59 @@
using System;

namespace NetMQ
{
/// <summary>
/// This enum-type specifies socket transport events (TCP and IPC only).
/// </summary>
[Flags]
public enum SocketEvents
{
Connected = 1,
ConnectDelayed = 2,
ConnectRetried = 4,

Listening = 8,
BindFailed = 16,

Accepted = 32,
AcceptFailed = 64,

Closed = 128,
CloseFailed = 256,
Disconnected = 512,

All = Connected | ConnectDelayed |
ConnectRetried | Listening |
BindFailed | Accepted |
AcceptFailed | Closed |
CloseFailed | Disconnected
}

/// <summary>
/// This enum-type specifies socket transport events (TCP and IPC only).
/// </summary>
[Flags]
[Obsolete("Use SocketEvents (plural) instead")]
public enum SocketEvent
{
Connected = 1,
ConnectDelayed = 2,
ConnectRetried = 4,

Listening = 8,
BindFailed = 16,

Accepted = 32,
AcceptFailed = 64,

Closed = 128,
CloseFailed = 256,
Disconnected = 512,

All = Connected | ConnectDelayed |
ConnectRetried | Listening |
BindFailed | Accepted |
AcceptFailed | Closed |
CloseFailed | Disconnected
}
}
4 changes: 2 additions & 2 deletions src/Performance/NetMQ.SimpleTests/LatencyBenchmark.cs
Expand Up @@ -123,7 +123,7 @@ public LatencyBenchmarkReusingMsg()

protected override long DoClient(NetMQSocket socket, int messageSize)
{
var msg = new Msg();
var msg = new Msg();
var watch = Stopwatch.StartNew();

for (int i = 0; i < Iterations; i++)
Expand All @@ -144,7 +144,7 @@ protected override void DoServer(NetMQSocket socket, int messageSize)

for (int i = 0; i < Iterations; i++)
{
socket.Receive(ref msg, SendReceiveOptions.None);
socket.Receive(ref msg);

socket.Send(ref msg, SendReceiveOptions.None);
}
Expand Down
Expand Up @@ -11,6 +11,8 @@
using NetMQ;
using NUnit.Framework;

#pragma warning disable 4014

namespace MajordomoTests
{
[TestFixture]
Expand Down Expand Up @@ -312,7 +314,7 @@ public async void Run_ReceiveREPLYMessageFromThreeDifferentWorker_ShouldLogAndRe
// the task completes when the message exchange is done
Task.WaitAll (client01Task, client02Task, client03Task);
// cancel the broker

cts.Cancel ();

Assert.That (log.Count, Is.EqualTo (4));
Expand Down
Expand Up @@ -19,7 +19,9 @@ public class FakeCloseMDPWorker : IMDPWorker

public TimeSpan ReconnectDelay { get; set; }

#pragma warning disable 67
public event EventHandler<MDPLogEventArgs> LogInfoReady;
#pragma warning restore 67

public NetMQMessage Receive (NetMQMessage reply)
{
Expand Down
Expand Up @@ -49,6 +49,8 @@ public NetMQMessage Send (string serviceName, NetMQMessage request)
return request; // as echo service :-)
}

#pragma warning disable 67
public event EventHandler<MDPLogEventArgs> LogInfoReady;
#pragma warning restore 67
}
}
Expand Up @@ -18,7 +18,9 @@ public class FakeReplyMDPWorker : IMDPWorker

public TimeSpan ReconnectDelay { get; set; }

#pragma warning disable 67
public event EventHandler<MDPLogEventArgs> LogInfoReady;
#pragma warning restore 67

public NetMQMessage Receive (NetMQMessage reply)
{
Expand Down
Expand Up @@ -18,7 +18,9 @@ public class FakeRequestMDPWorker : IMDPWorker

public TimeSpan ReconnectDelay { get; set; }

#pragma warning disable 67
public event EventHandler<MDPLogEventArgs> LogInfoReady;
#pragma warning restore 67

public NetMQMessage Receive (NetMQMessage reply)
{
Expand Down