Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
...
  • 3 commits
  • 7 files changed
  • 0 commit comments
  • 1 contributor
View
9 src/NHibernate.ZMELogPublisher.Tests/PublisherTests.cs
@@ -62,12 +62,13 @@ public void RunBeforeEachTest()
[Ignore]
public void OpeningMultipleSessionsInDifferentThreads()
{
+
+ Publisher.Start();
+
int expectedSessions = 100;
subscriberTask = new Task(() => this.StartSubscriber(expectedSessions));
subscriberTask.Start(); // start subscriber to listen to messages
- Publisher.Start();
-
Task[] tasks = new Task[expectedSessions];
for (int i = 0; i < expectedSessions; i++)
{
@@ -159,6 +160,9 @@ private void OpenSessionAndSave(Animal animal)
tx.Commit();
}
+ var animals = session.QueryOver<Animal>().List();
+ var dogs = session.QueryOver<Dog>().List();
+ var lizards = session.QueryOver<Lizard>().List();
}
}
@@ -180,7 +184,6 @@ private void StartSubscriber(int expectedSessions)
message = subscriber.Recv(Encoding.Unicode, 10);
if (message != null)
{
- Console.WriteLine(message);
this.recievedMessages.Add(message);
}
}
View
1 src/NHibernate.ZMQLogPublisher/NHibernate.ZMQLogPublisher.csproj
@@ -60,6 +60,7 @@
<Compile Include="LogDetails.cs" />
<Compile Include="Publisher.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="SocketManager.cs" />
<Compile Include="ZmqLogger.cs" />
<Compile Include="ZmqLoggerFactory.cs" />
</ItemGroup>
View
25 src/NHibernate.ZMQLogPublisher/Publisher.cs
@@ -1,25 +1,20 @@
namespace NHibernate.ZMQLogPublisher
{
using System.Text;
+ using System.Threading;
using System.Threading.Tasks;
using ZMQ;
public class Publisher
{
- private static Context context;
+ private static SocketManager socketManager;
private static bool running;
private static bool stopping;
- public static bool Running
- {
- get
- {
- return running;
- }
- }
+ private static Thread publisherThread;
public static void Start()
{
@@ -28,15 +23,16 @@ public static void Start()
public static void Start(int port)
{
- context = new Context(1);
+ socketManager = new SocketManager();
- new Task(() => ListenAndPublishLogMessages(port)).Start();
+ publisherThread = new Thread(() => ListenAndPublishLogMessages(port));
+ publisherThread.Start();
while(!running)
{
}
- LoggerProvider.SetLoggersFactory(new ZmqLoggerFactory(context));
+ LoggerProvider.SetLoggersFactory(new ZmqLoggerFactory(socketManager));
}
public static void Shutdown()
@@ -51,7 +47,8 @@ public static void Shutdown()
private static void ListenAndPublishLogMessages(int port)
{
- using (Socket publisher = context.Socket(SocketType.PUB), loggers = context.Socket(SocketType.PULL))
+ using (Socket publisher = socketManager.Context.Socket(SocketType.PUB),
+ loggers = socketManager.Context.Socket(SocketType.PULL))
{
publisher.Bind(string.Format("tcp://*:{0}", port));
publisher.Linger = 0;
@@ -62,7 +59,7 @@ private static void ListenAndPublishLogMessages(int port)
while (running && !stopping)
{
- var logMessage = loggers.Recv(Encoding.Unicode, timeout: 1000);
+ var logMessage = loggers.Recv(Encoding.Unicode, SendRecvOpt.NOBLOCK);
if (logMessage != null)
{
publisher.Send(logMessage, Encoding.Unicode);
@@ -71,7 +68,7 @@ private static void ListenAndPublishLogMessages(int port)
}
running = false;
- context.Dispose();
+ socketManager.Terminate();
}
}
}
View
68 src/NHibernate.ZMQLogPublisher/SocketManager.cs
@@ -0,0 +1,68 @@
+namespace NHibernate.ZMQLogPublisher
+{
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+
+ using ZMQ;
+
+ public class SocketManager
+ {
+ private readonly Context context;
+
+ private ConcurrentDictionary<string, Socket> sockets;
+
+ private object synclock = new object();
+
+ private bool terminated;
+
+ public SocketManager()
+ {
+ this.context = new Context(1);
+ this.sockets = new ConcurrentDictionary<string, Socket>();
+ }
+
+ public bool Terminated
+ {
+ get
+ {
+ return this.terminated;
+ }
+ }
+
+ public Context Context
+ {
+ get
+ {
+ return this.context;
+ }
+ }
+
+ public Socket CreateSocketForKey(string loggerKey, SocketType socketType)
+ {
+ return this.sockets.GetOrAdd(
+ loggerKey,
+ key =>
+ {
+ var socket = this.Context.Socket(socketType);
+ this.sockets.TryAdd(loggerKey, socket);
+ return socket;
+ });
+ }
+
+ public void Terminate()
+ {
+ this.terminated = true;
+
+ foreach (var socket in sockets.Values)
+ {
+ lock (this.synclock)
+ {
+ socket.Dispose();
+ }
+ }
+
+ sockets.Clear();
+ this.Context.Dispose();
+ }
+ }
+}
View
31 src/NHibernate.ZMQLogPublisher/ZmqLogger.cs
@@ -14,18 +14,18 @@ public class ZmqLogger : IInternalLogger
{
private string keyName;
- private readonly Context context;
-
- private bool contextDisposed;
+ private readonly SocketManager socketManager;
private Socket sender;
- public ZmqLogger(string keyName, Context context)
+ private object socketLock = new object();
+
+ public ZmqLogger(string keyName, SocketManager socketManager)
{
this.keyName = keyName;
- this.context = context;
+ this.socketManager = socketManager;
- this.sender = this.context.Socket(SocketType.PUSH);
+ this.sender = this.socketManager.CreateSocketForKey(keyName, SocketType.PUSH);
this.sender.Linger = 0;
@@ -47,26 +47,15 @@ private void Publish(string message, Exception exception)
};
string serializedLogDetails = logDetails.ToJson();
-
- try
+
+ lock (socketLock)
{
- if (!this.contextDisposed)
+ if (!this.socketManager.Terminated)
{
this.sender.Send(serializedLogDetails, Encoding.Unicode);
}
}
- catch (ZMQ.Exception e)
- {
- // when publisher is shutdown, sockets throw exception
- // with error ETERM. Can't find a cleaner way to check if
- // context is disposed.
- if (e.Errno == (int)ERRNOS.ETERM)
- {
- return;
- }
-
- throw;
- }
+
}
public void Error(object message)
View
8 src/NHibernate.ZMQLogPublisher/ZmqLoggerFactory.cs
@@ -6,16 +6,16 @@ namespace NHibernate.ZMQLogPublisher
public class ZmqLoggerFactory : ILoggerFactory
{
- private Context context;
+ private readonly SocketManager socketManager;
- public ZmqLoggerFactory(Context context)
+ public ZmqLoggerFactory(SocketManager socketManager)
{
- this.context = context;
+ this.socketManager = socketManager;
}
public IInternalLogger LoggerFor(string keyName)
{
- return new ZmqLogger(keyName, context);
+ return new ZmqLogger(keyName, this.socketManager);
}
public IInternalLogger LoggerFor(Type type)
View
64 src/TestPublisher/Program.cs
@@ -5,6 +5,9 @@
namespace TestPublisher
{
+ using System.Diagnostics;
+ using System.Threading.Tasks;
+
using NHibernate.Cfg;
using NHibernate.Tool.hbm2ddl;
using NHibernate.ZMELogPublisher.Tests.TestData;
@@ -25,33 +28,46 @@ static void Main(string[] args)
using(var sessionFactory = config.BuildSessionFactory())
{
- using (var session = sessionFactory.OpenSession())
+ Stopwatch sw = new Stopwatch();
+ sw.Start();
+ Task[] tasks = new Task[50];
+ for (int i = 0; i < 50; i++)
{
- using (var tx = session.BeginTransaction())
- {
- session.Save(new Lizard() { SerialNumber = "11111", Description = "Saving lizard to get a new logger requested" });
-
- var dog = new Dog
- {
- BirthDate = DateTime.Now.AddYears(-1),
- BodyWeight = 10,
- Description = "Some dog",
- SerialNumber = "98765"
- };
- var puppy = new Dog
+ tasks[i] = new Task(() =>
{
- BirthDate = DateTime.Now,
- BodyWeight = 2,
- Description = "Some pup",
- SerialNumber = "9875"
- };
- dog.Children = new List<Animal>();
- dog.Children.Add(puppy);
- puppy.Mother = dog;
-
- tx.Commit();
- }
+ using (var session = sessionFactory.OpenSession())
+ {
+ using (var tx = session.BeginTransaction())
+ {
+ session.Save(new Lizard() { SerialNumber = "11111", Description = "Saving lizard to get a new logger requested" });
+
+ var dog = new Dog
+ {
+ BirthDate = DateTime.Now.AddYears(-1),
+ BodyWeight = 10,
+ Description = "Some dog",
+ SerialNumber = "98765"
+ };
+ var puppy = new Dog
+ {
+ BirthDate = DateTime.Now,
+ BodyWeight = 2,
+ Description = "Some pup",
+ SerialNumber = "9875"
+ };
+ dog.Children = new List<Animal>();
+ dog.Children.Add(puppy);
+ puppy.Mother = dog;
+
+ tx.Commit();
+ }
+ }
+ });
+ tasks[i].Start();
}
+
+ Task.WaitAll(tasks);
+ Console.WriteLine(sw.Elapsed);
Console.ReadLine();
}

No commit comments for this range

Something went wrong with that request. Please try again.