Permalink
Browse files

Convert Device tests.

  • Loading branch information...
jgoz committed Nov 11, 2012
1 parent a22be56 commit 363636c67c67368c472186288089148fdab1b919
@@ -0,0 +1,154 @@
+namespace ZeroMQ.AcceptanceTests
+{
+ using System;
+ using System.Threading;
+ using Devices;
+ using NUnit.Framework;
+
+ public abstract class UsingThreadedDevice<TDevice> where TDevice : Device
+ {
+ protected const string FrontendAddr = "inproc://dev_frontend";
+ protected const string BackendAddr = "inproc://dev_backend";
+
+ protected Func<TDevice> CreateDevice;
+ protected Func<ZmqSocket> CreateSender;
+ protected Func<ZmqSocket> CreateReceiver;
+
+ protected ZmqSocket Sender;
+ protected ZmqSocket Receiver;
+ protected TDevice Device;
+ protected ZmqContext ZmqContext;
+
+ protected Action<TDevice> DeviceInit;
+ protected Action<ZmqSocket> SenderInit;
+ protected Action<ZmqSocket> SenderAction;
+ protected Action<ZmqSocket> ReceiverInit;
+ protected Action<ZmqSocket> ReceiverAction;
+
+ private Thread _deviceThread;
+ private Thread _receiverThread;
+ private Thread _senderThread;
+
+ private ManualResetEvent _deviceReady;
+ private ManualResetEvent _receiverReady;
+ private ManualResetEvent _receiverDone;
+
+ protected UsingThreadedDevice()
+ {
+ DeviceInit = dev => { };
+ SenderInit = sck => { };
+ ReceiverInit = sck => { };
+ SenderAction = sck => { };
+ ReceiverAction = sck => { };
+ }
+
+ [TestFixtureSetUp]
+ public void Initialize()
+ {
+ ZmqContext = ZmqContext.Create();
+ Device = CreateDevice();
+ Sender = CreateSender();
+ Receiver = CreateReceiver();
+
+ _deviceReady = new ManualResetEvent(false);
+ _receiverReady = new ManualResetEvent(false);
+ _receiverDone = new ManualResetEvent(false);
+
+ _deviceThread = new Thread(() =>
+ {
+ DeviceInit(Device);
+ Device.Initialize();
+
+ _deviceReady.Set();
+
+ Device.Start();
+ });
+
+ _receiverThread = new Thread(() =>
+ {
+ _deviceReady.WaitOne();
+
+ ReceiverInit(Receiver);
+ Receiver.ReceiveHighWatermark = 1;
+ Receiver.Linger = TimeSpan.Zero;
+ Receiver.Connect(BackendAddr);
+
+ _receiverReady.Set();
+
+ ReceiverAction(Receiver);
+
+ _receiverDone.Set();
+ });
+
+ _senderThread = new Thread(() =>
+ {
+ _receiverReady.WaitOne();
+
+ SenderInit(Sender);
+ Sender.SendHighWatermark = 1;
+ Sender.Linger = TimeSpan.Zero;
+ Sender.Connect(FrontendAddr);
+
+ Device.PollerPulse.WaitOne();
+
+ SenderAction(Sender);
+ });
+
+ StartThreads();
+ }
+
+ [TestFixtureTearDown]
+ public void Cleanup()
+ {
+ _receiverDone.WaitOne();
+
+ _deviceReady.Dispose();
+ _receiverReady.Dispose();
+ _receiverDone.Dispose();
+
+ if (Sender != null)
+ {
+ Sender.Dispose();
+ }
+
+ if (Receiver != null)
+ {
+ Receiver.Dispose();
+ }
+
+ if (Device != null)
+ {
+ Device.Dispose();
+ }
+
+ if (ZmqContext != null)
+ {
+ ZmqContext.Dispose();
+ }
+ }
+
+ protected void StartThreads()
+ {
+ _deviceThread.Start();
+ _receiverThread.Start();
+ _senderThread.Start();
+
+ if (!_receiverThread.Join(5000))
+ {
+ _receiverThread.Abort();
+ }
+
+ if (!_senderThread.Join(5000))
+ {
+ _senderThread.Abort();
+ }
+
+ Device.Stop();
+
+ if (!_deviceThread.Join(5000))
+ {
+ _deviceThread.Abort();
+ }
+ }
+ }
+}
@@ -1,111 +0,0 @@
-namespace ZeroMQ.AcceptanceTests.DeviceSpecs
-{
- using System;
-
- using Devices;
-
- using Machine.Specifications;
-
- [Subject("Forwarder")]
- class when_using_forwarder_device_with_full_subscription : using_forwarder_device
- {
- protected static Frame message1;
- protected static Frame message2;
- protected static SendStatus sendResult1;
- protected static SendStatus sendResult2;
-
- Establish context = () =>
- {
- deviceInit = dev => dev.FrontendSetup.SubscribeAll();
- receiverInit = sub => sub.SubscribeAll();
-
- receiverAction = sub =>
- {
- message1 = sub.ReceiveFrame();
- message2 = sub.ReceiveFrame(TimeSpan.FromMilliseconds(50));
- };
-
- senderAction = pub =>
- {
- sendResult1 = pub.SendFrame(Messages.PubSubFirst);
- sendResult2 = pub.SendFrame(Messages.PubSubSecond);
- };
- };
-
- Because of = StartThreads;
-
- Behaves_like<PubSubReceiveAll> successfully_received_all_messages;
- }
-
- [Subject("Forwarder")]
- class when_using_forwarder_device_with_a_receiver_subscription : using_forwarder_device
- {
- protected static Frame message1;
- protected static Frame message2;
- protected static SendStatus sendResult1;
- protected static SendStatus sendResult2;
-
- Establish context = () =>
- {
- deviceInit = dev => dev.FrontendSetup.SubscribeAll();
- receiverInit = sub => sub.Subscribe(Messages.PubSubPrefix);
-
- receiverAction = sub =>
- {
- message1 = sub.ReceiveFrame();
- message2 = sub.ReceiveFrame(TimeSpan.FromMilliseconds(50));
- };
-
- senderAction = pub =>
- {
- sendResult1 = pub.SendFrame(Messages.PubSubFirst);
- sendResult2 = pub.SendFrame(Messages.PubSubSecond);
- };
- };
-
- Because of = StartThreads;
-
- Behaves_like<PubSubReceiveFirst> successfully_received_first_message_and_filtered_out_second;
- }
-
- [Subject("Forwarder")]
- class when_using_forwarder_device_with_a_device_subscription : using_forwarder_device
- {
- protected static Frame message1;
- protected static Frame message2;
- protected static SendStatus sendResult1;
- protected static SendStatus sendResult2;
-
- Establish context = () =>
- {
- deviceInit = dev => dev.FrontendSetup.Subscribe(Messages.PubSubPrefix);
- receiverInit = sub => sub.SubscribeAll();
-
- receiverAction = sub =>
- {
- message1 = sub.ReceiveFrame();
- message2 = sub.ReceiveFrame(TimeSpan.FromMilliseconds(50));
- };
-
- senderAction = pub =>
- {
- sendResult1 = pub.SendFrame(Messages.PubSubFirst);
- sendResult2 = pub.SendFrame(Messages.PubSubSecond);
- };
- };
-
- Because of = StartThreads;
-
- Behaves_like<PubSubReceiveFirst> successfully_received_first_message_and_filtered_out_second;
- }
-
- abstract class using_forwarder_device : using_threaded_device<ForwarderDevice>
- {
- static using_forwarder_device()
- {
- createSender = () => zmqContext.CreateSocket(SocketType.PUB);
- createReceiver = () => zmqContext.CreateSocket(SocketType.SUB);
- createDevice = () => new ForwarderDevice(zmqContext, FrontendAddr, BackendAddr, DeviceMode.Blocking);
- }
- }
-}
@@ -1,123 +0,0 @@
-namespace ZeroMQ.AcceptanceTests.DeviceSpecs
-{
- using System;
-
- using Devices;
-
- using Machine.Specifications;
-
- [Subject("Queue")]
- class when_using_queue_device_to_send_a_single_message_in_blocking_mode : using_queue_device
- {
- protected static Frame message;
- protected static SendStatus sendResult;
-
- Establish context = () =>
- {
- senderAction = req => sendResult = req.SendFrame(Messages.SingleMessage);
- receiverAction = rep => message = rep.ReceiveFrame();
- };
-
- Because of = StartThreads;
-
- Behaves_like<SingleMessageReceived> successfully_sent_single_message;
- }
-
- [Subject("Queue")]
- class when_using_queue_device_to_send_a_single_message_with_an_ample_timeout : using_queue_device
- {
- protected static Frame message;
- protected static SendStatus sendResult;
-
- Establish context = () =>
- {
- senderAction = req => sendResult = req.SendFrame(Messages.SingleMessage, TimeSpan.FromMilliseconds(2000));
- receiverAction = rep => message = rep.ReceiveFrame(TimeSpan.FromMilliseconds(2000));
- };
-
- Because of = StartThreads;
-
- Behaves_like<SingleMessageReceived> successfully_sent_single_message;
- }
-
- [Subject("Queue")]
- class when_using_queue_device_to_receive_a_single_message_with_insufficient_timeout : using_queue_device
- {
- protected static Frame message;
-
- Establish context = () =>
- {
- receiverAction = rep => message = rep.ReceiveFrame(TimeSpan.FromMilliseconds(0));
- };
-
- Because of = StartThreads;
-
- Behaves_like<SingleMessageNotReceived> receiver_must_try_again;
- }
-
- [Subject("Queue")]
- class when_using_queue_device_to_send_a_multipart_message_in_blocking_mode : using_queue_device
- {
- protected static ZmqMessage message;
- protected static SendStatus sendResult1;
- protected static SendStatus sendResult2;
-
- Establish context = () =>
- {
- senderAction = req =>
- {
- sendResult1 = req.SendFrame(Messages.MultiFirst);
- sendResult2 = req.SendFrame(Messages.MultiLast);
- };
-
- receiverAction = rep =>
- {
- message = rep.ReceiveMessage();
- };
- };
-
- Because of = StartThreads;
-
- Behaves_like<CompleteMessageReceived> successfully_sent_multi_part_message;
- }
-
- [Subject("Queue")]
- class when_using_queue_device_to_send_a_multipart_message_with_an_ample_timeout : using_queue_device
- {
- protected static ZmqMessage message;
- protected static SendStatus sendResult1;
- protected static SendStatus sendResult2;
-
- Establish context = () =>
- {
- senderAction = req =>
- {
- sendResult1 = req.SendFrame(Messages.MultiFirst, TimeSpan.FromMilliseconds(2000));
- sendResult2 = req.SendFrame(Messages.MultiLast, TimeSpan.FromMilliseconds(2000));
- };
-
- receiverAction = rep =>
- {
- message = new ZmqMessage(new[]
- {
- rep.ReceiveFrame(TimeSpan.FromMilliseconds(2000)),
- rep.ReceiveFrame(TimeSpan.FromMilliseconds(2000))
- });
- };
- };
-
- Because of = StartThreads;
-
- Behaves_like<CompleteMessageReceived> sends_multi_part_message_successfully;
- }
-
- abstract class using_queue_device : using_threaded_device<QueueDevice>
- {
- static using_queue_device()
- {
- createSender = () => zmqContext.CreateSocket(SocketType.REQ);
- createReceiver = () => zmqContext.CreateSocket(SocketType.REP);
- createDevice = () => new QueueDevice(zmqContext, FrontendAddr, BackendAddr, DeviceMode.Blocking);
- }
- }
-}
Oops, something went wrong.

0 comments on commit 363636c

Please sign in to comment.