Skip to content
Browse files

Add SocketManager to manage disposing of sockets

  • Loading branch information...
1 parent 7f2fa25 commit 9490720e3ed6c0ab31980d6acb318ee3d9b7e7d3 @seif committed
View
1 src/NHibernate.ZMQLogPublisher/NHibernate.ZMQLogPublisher.csproj
@@ -60,6 +60,7 @@
<Compile Include="LogDetails.cs" />
<Compile Include="Publisher.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="SocketManager.cs" />
<Compile Include="ZmqLogger.cs" />
<Compile Include="ZmqLoggerFactory.cs" />
</ItemGroup>
View
25 src/NHibernate.ZMQLogPublisher/Publisher.cs
@@ -1,25 +1,20 @@
namespace NHibernate.ZMQLogPublisher
{
using System.Text;
+ using System.Threading;
using System.Threading.Tasks;
using ZMQ;
public class Publisher
{
- private static Context context;
+ private static SocketManager socketManager;
private static bool running;
private static bool stopping;
- public static bool Running
- {
- get
- {
- return running;
- }
- }
+ private static Thread publisherThread;
public static void Start()
{
@@ -28,15 +23,16 @@ public static void Start()
public static void Start(int port)
{
- context = new Context(1);
+ socketManager = new SocketManager();
- new Task(() => ListenAndPublishLogMessages(port)).Start();
+ publisherThread = new Thread(() => ListenAndPublishLogMessages(port));
+ publisherThread.Start();
while(!running)
{
}
- LoggerProvider.SetLoggersFactory(new ZmqLoggerFactory(context));
+ LoggerProvider.SetLoggersFactory(new ZmqLoggerFactory(socketManager));
}
public static void Shutdown()
@@ -51,7 +47,8 @@ public static void Shutdown()
private static void ListenAndPublishLogMessages(int port)
{
- using (Socket publisher = context.Socket(SocketType.PUB), loggers = context.Socket(SocketType.PULL))
+ using (Socket publisher = socketManager.Context.Socket(SocketType.PUB),
+ loggers = socketManager.Context.Socket(SocketType.PULL))
{
publisher.Bind(string.Format("tcp://*:{0}", port));
publisher.Linger = 0;
@@ -62,7 +59,7 @@ private static void ListenAndPublishLogMessages(int port)
while (running && !stopping)
{
- var logMessage = loggers.Recv(Encoding.Unicode, timeout: 1000);
+ var logMessage = loggers.Recv(Encoding.Unicode, SendRecvOpt.NOBLOCK);
if (logMessage != null)
{
publisher.Send(logMessage, Encoding.Unicode);
@@ -71,7 +68,7 @@ private static void ListenAndPublishLogMessages(int port)
}
running = false;
- context.Dispose();
+ socketManager.Terminate();
}
}
}
View
68 src/NHibernate.ZMQLogPublisher/SocketManager.cs
@@ -0,0 +1,68 @@
+namespace NHibernate.ZMQLogPublisher
+{
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+
+ using ZMQ;
+
+ public class SocketManager
+ {
+ private readonly Context context;
+
+ private ConcurrentDictionary<string, Socket> sockets;
+
+ private object synclock = new object();
+
+ private bool terminated;
+
+ public SocketManager()
+ {
+ this.context = new Context(1);
+ this.sockets = new ConcurrentDictionary<string, Socket>();
+ }
+
+ public bool Terminated
+ {
+ get
+ {
+ return this.terminated;
+ }
+ }
+
+ public Context Context
+ {
+ get
+ {
+ return this.context;
+ }
+ }
+
+ public Socket CreateSocketForKey(string loggerKey, SocketType socketType)
+ {
+ return this.sockets.GetOrAdd(
+ loggerKey,
+ key =>
+ {
+ var socket = this.Context.Socket(socketType);
+ this.sockets.TryAdd(loggerKey, socket);
+ return socket;
+ });
+ }
+
+ public void Terminate()
+ {
+ this.terminated = true;
+
+ foreach (var socket in sockets.Values)
+ {
+ lock (this.synclock)
+ {
+ socket.Dispose();
+ }
+ }
+
+ sockets.Clear();
+ this.Context.Dispose();
+ }
+ }
+}
View
31 src/NHibernate.ZMQLogPublisher/ZmqLogger.cs
@@ -14,18 +14,18 @@ public class ZmqLogger : IInternalLogger
{
private string keyName;
- private readonly Context context;
-
- private bool contextDisposed;
+ private readonly SocketManager socketManager;
private Socket sender;
- public ZmqLogger(string keyName, Context context)
+ private object socketLock = new object();
+
+ public ZmqLogger(string keyName, SocketManager socketManager)
{
this.keyName = keyName;
- this.context = context;
+ this.socketManager = socketManager;
- this.sender = this.context.Socket(SocketType.PUSH);
+ this.sender = this.socketManager.CreateSocketForKey(keyName, SocketType.PUSH);
this.sender.Linger = 0;
@@ -47,26 +47,15 @@ private void Publish(string message, Exception exception)
};
string serializedLogDetails = logDetails.ToJson();
-
- try
+
+ lock (socketLock)
{
- if (!this.contextDisposed)
+ if (!this.socketManager.Terminated)
{
this.sender.Send(serializedLogDetails, Encoding.Unicode);
}
}
- catch (ZMQ.Exception e)
- {
- // when publisher is shutdown, sockets throw exception
- // with error ETERM. Can't find a cleaner way to check if
- // context is disposed.
- if (e.Errno == (int)ERRNOS.ETERM)
- {
- return;
- }
-
- throw;
- }
+
}
public void Error(object message)
View
8 src/NHibernate.ZMQLogPublisher/ZmqLoggerFactory.cs
@@ -6,16 +6,16 @@ namespace NHibernate.ZMQLogPublisher
public class ZmqLoggerFactory : ILoggerFactory
{
- private Context context;
+ private readonly SocketManager socketManager;
- public ZmqLoggerFactory(Context context)
+ public ZmqLoggerFactory(SocketManager socketManager)
{
- this.context = context;
+ this.socketManager = socketManager;
}
public IInternalLogger LoggerFor(string keyName)
{
- return new ZmqLogger(keyName, context);
+ return new ZmqLogger(keyName, this.socketManager);
}
public IInternalLogger LoggerFor(Type type)

0 comments on commit 9490720

Please sign in to comment.
Something went wrong with that request. Please try again.