Skip to content

Commit

Permalink
Use ResetEvent, introduced Configuration and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
seif committed Apr 10, 2012
1 parent 09bc9bf commit e6b54a5
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 133 deletions.
55 changes: 39 additions & 16 deletions src/NHibernate.ZMELogPublisher.Tests/PublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using ZMQ;

using Configuration = NHibernate.ZMQLogPublisher.Configuration;
using Exception = System.Exception;

[TestFixture]
Expand All @@ -31,12 +32,12 @@ public class PublisherTests

private Task subscriberTask;

private bool subscriberStarted;
private readonly ManualResetEvent subscriberReadyEvent= new ManualResetEvent(false);

[TestFixtureSetUp]
public void RunOnceBeforeAllTests()
{
var config = new Configuration();
var config = new NHibernate.Cfg.Configuration();
config.Configure("nh.sqlite.config");
config.SessionFactoryName("Test session factory");
config.AddAssembly(this.GetType().Assembly);
Expand All @@ -58,9 +59,33 @@ public void RunAfterAllTests()
public void RunBeforeEachTest()
{
timer.Change(5000, Timeout.Infinite);
subscriberStarted = false;
}

[Test]
public void WorksWithInprocSubscribers()
{

Context context = new Context(1);

Publisher.Start(new Publisher(
Configuration.LoadDefault()
.ConfigurePublisherSocket(s => s.Address = "inproc://publisher")
.ConfigureSyncSocket(s => s.Address = "inproc://sync"),
context));

this.subscriberTask = new Task(() => this.StartSubscriber(1, "inproc://publisher", "inproc://sync", context));
this.subscriberTask.Start(); // start subscriber to listen to messages

this.subscriberReadyEvent.WaitOne(5000);

this.OpenSessionAndSaveDogWithChild();
this.subscriberTask.Wait(); // wait until subscriber finished

Publisher.Stop();

Assert.AreEqual(1, this.recievedMessages.Count(m => m.Contains("opened session")), "Did not recieve session opened message for all sessions.");
}

[Test]
public void OpeningMultipleSessionsInDifferentThreads()
{
Expand All @@ -78,7 +103,7 @@ public void OpeningMultipleSessionsInDifferentThreads()

Task.WaitAll(tasks);

Publisher.Shutdown();
Publisher.Stop();
this.subscriberTask.Wait(); // wait until subscriber finished

Assert.AreEqual(expectedSessions, this.recievedMessages.Count(m => m.Contains("opened session")), "Did not recieve session opened message for all sessions.");
Expand All @@ -94,7 +119,7 @@ public void OpeningSessionPublishesEvent()
this.OpenSessionAndSaveDogWithChild();
this.subscriberTask.Wait(); // wait until subscriber finished

Publisher.Shutdown();
Publisher.Stop();

Assert.AreEqual(1, this.recievedMessages.Count(m => m.Contains("opened session")), "Did not recieve session opened message for all sessions.");
}
Expand All @@ -106,7 +131,7 @@ public void UsingNHibernateAfterShutingPublisherDownShouldNotThrow()
{
Publisher.Start();
OpenSessionAndSaveDogWithChild();
Publisher.Shutdown();
Publisher.Stop();
OpenSessionAndSaveDogWithChild();
OpenSessionAndSave(
Expand All @@ -131,12 +156,10 @@ private void AssertNoExceptionThrown(Action action)

private void StartSubscriberThread(int expectedSessions)
{
this.subscriberTask = new Task(() => this.StartSubscriber(expectedSessions));
this.subscriberTask = new Task(() => this.StartSubscriber(expectedSessions, "tcp://localhost:68748", "tcp://localhost:68747", new Context(1)));
this.subscriberTask.Start(); // start subscriber to listen to messages

while (!this.subscriberStarted)
{
}
this.subscriberReadyEvent.WaitOne(5000);
}

private void OpenSessionAndSaveDogWithChild()
Expand Down Expand Up @@ -177,20 +200,20 @@ private void OpenSessionAndSave(Animal animal)
}
}

private void StartSubscriber(int expectedSessions)
private void StartSubscriber(int expectedSessions, string publisherAddress, string syncAddress, Context context)
{
this.recievedMessages.Clear();

using(var context = new Context(1))

using (Socket subscriber = context.Socket(SocketType.SUB),
syncClient = context.Socket(SocketType.REQ))
{
subscriber.Subscribe("", Encoding.Unicode);
subscriber.Linger = 0;
subscriber.Connect("tcp://localhost:68748");
subscriberStarted = true;
subscriber.Connect(publisherAddress);

this.subscriberReadyEvent.Set();

syncClient.Connect("tcp://localhost:68747");
syncClient.Connect(syncAddress);

syncClient.Send("", Encoding.Unicode);
syncClient.Recv();
Expand Down
51 changes: 51 additions & 0 deletions src/NHibernate.ZMQLogPublisher/Configuration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
namespace NHibernate.ZMQLogPublisher
{
using System.Collections.Generic;

public class Configuration
{
public SocketConfiguration SyncSocketConfig { get; set; }

public SocketConfiguration PublisherSocketConfig { get; set; }

public List<string> LoggersToPublish { get; set; }

public static Configuration LoadDefault()
{
var config = new Configuration();
config.SyncSocketConfig = new SocketConfiguration { Address = "tcp://*:68747" };
config.PublisherSocketConfig = new SocketConfiguration { Address = "tcp://*:68748" };
config.LoggersToPublish = new List<string>
{
"NHibernate.SQL", "NHibernate.Impl.SessionImpl", "NHibernate.Transaction.AdoTransaction",
"NHibernate.AdoNet.AbstractBatcher"
};

return config;
}

public Configuration AddLoggerKeyToPublish(string key)
{
if (!LoggersToPublish.Contains(key))
{
LoggersToPublish.Add(key);
}

return this;
}

public Configuration ConfigureSyncSocket(System.Action<SocketConfiguration> socketConfigAction)
{
socketConfigAction(this.SyncSocketConfig);

return this;
}

public Configuration ConfigurePublisherSocket(System.Action<SocketConfiguration> socketConfigAction)
{
socketConfigAction(this.PublisherSocketConfig);

return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@
<Compile Include="..\CommonAssemblyInfo.cs">
<Link>Properties\CommonAssemblyInfo.cs</Link>
</Compile>
<Compile Include="Configuration.cs" />
<Compile Include="LogDetails.cs" />
<Compile Include="Publisher.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="SocketConfiguration.cs" />
<Compile Include="ZmqLogger.cs" />
<Compile Include="ZmqLoggerFactory.cs" />
</ItemGroup>
Expand Down
142 changes: 95 additions & 47 deletions src/NHibernate.ZMQLogPublisher/Publisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,100 +8,148 @@

public class Publisher
{
private static Context context = new Context(1);
private static Publisher instance;

private static volatile bool stopping;
private Configuration configuration;

private static volatile bool running;
private Context context;

private static Thread publisherThread;
private ManualResetEvent threadRunningEvent;

private static ZmqLoggerFactory zmqLoggerFactory;
private ManualResetEvent threadStoppedEvent;

public static bool Running
private bool stopping;

private bool running;

private Thread publisherThread;

private ZmqLoggerFactory zmqLoggerFactory;


public static Publisher Instance
{
get
{
return running;
return instance;
}
}

public static void Start()
public bool Running
{
Start(68748);
get
{
return this.running;
}
}

public static void Start(int port)
public Publisher(Configuration configuration)
:this(configuration, new Context(1))
{
publisherThread = new Thread(() => ListenAndPublishLogMessages(port));
publisherThread.Start();
}

while (!Running)
{
}
public Publisher(Configuration configuration, Context context)
{
this.context = context;
this.configuration = configuration;
this.zmqLoggerFactory = new ZmqLoggerFactory(configuration.LoggersToPublish.ToArray());

this.threadRunningEvent = new ManualResetEvent(false);
this.threadStoppedEvent = new ManualResetEvent(false);
}

if (zmqLoggerFactory == null)
{
zmqLoggerFactory = new ZmqLoggerFactory();
}
public static void Start()
{
Start(new Publisher(Configuration.LoadDefault()));
}

zmqLoggerFactory.Initialize(context);
public static void Start(Publisher configuredInstance)
{
instance = configuredInstance;
instance.StartPublisherThread();
instance.AssociateWithNHibernate();
}

LoggerProvider.SetLoggersFactory(zmqLoggerFactory);
public static void Stop()
{
instance.Shutdown();
}

public static void Shutdown()
public void Shutdown()
{
stopping = true;
this.stopping = true;
this.running = false;

while (Running)
{
}
this.threadStoppedEvent.WaitOne();
this.stopping = false;
}

public void StartPublisherThread()
{
this.publisherThread = new Thread(() => this.ListenAndPublishLogMessages());
this.publisherThread.Start();

stopping = false;
this.threadRunningEvent.WaitOne(5000);
this.running = true;
}

private static void ListenAndPublishLogMessages(int port)
private void AssociateWithNHibernate()
{
using (Socket publisher = context.Socket(SocketType.PUB),
loggers = context.Socket(SocketType.PULL),
syncService = context.Socket(SocketType.REP))
{
publisher.Bind(string.Format("tcp://*:{0}", port));
publisher.HWM = 100000;
publisher.Linger = 0;
this.zmqLoggerFactory.Initialize(this.context);

loggers.Bind("inproc://loggers");
loggers.Linger = 0;
LoggerProvider.SetLoggersFactory(this.zmqLoggerFactory);
}

syncService.Bind("tcp://*:68747");

loggers.PollInHandler += (socket, revents) => publisher.Send(socket.Recv());
private void ListenAndPublishLogMessages()
{
using (Socket publisher = this.context.Socket(SocketType.PUB),
loggersSink = this.context.Socket(SocketType.PULL),
syncSocket = this.context.Socket(SocketType.REP))
{
this.ConfigureSocket(publisher, this.configuration.PublisherSocketConfig);
this.ConfigureSocket(syncSocket, this.configuration.SyncSocketConfig);

this.StartLoggersSink(loggersSink);
loggersSink.PollInHandler += (socket, revents) => publisher.Send(socket.Recv());

running = true;
this.threadRunningEvent.Set();

byte[] syncMessage = null;
// keep waiting for syncMessage before starting to publish
// unless we stop before we recieve the sync message
while (!stopping && syncMessage == null)
while (!this.stopping && syncMessage == null)
{
syncMessage = syncService.Recv(SendRecvOpt.NOBLOCK);
syncMessage = syncSocket.Recv(SendRecvOpt.NOBLOCK);
}

// send sync confirmation if we recieved a sync request
if(syncMessage != null)
{
syncService.Send("", Encoding.Unicode);
syncSocket.Send(string.Empty, Encoding.Unicode);
}

while (!stopping)
while (!this.stopping)
{
Context.Poller(new List<Socket> { loggers, publisher }, 1000);
Context.Poller(new List<Socket> { loggersSink, publisher }, 2000);
}
}

running = false;
zmqLoggerFactory.DisposeSockets();
this.threadStoppedEvent.Set();
this.zmqLoggerFactory.StopSockets();
}

private void StartLoggersSink(Socket loggers)
{
loggers.Linger = 0;
loggers.Bind(Transport.INPROC, "loggers");
}

private void ConfigureSocket(Socket socket, SocketConfiguration socketConfig)
{
socket.Bind(socketConfig.Address);
socket.HWM = socketConfig.HighWaterMark;
socket.Linger = socketConfig.Linger;
}
}
}
Loading

0 comments on commit e6b54a5

Please sign in to comment.