From f538aeedf02d2bf8e5d45de052f29b2bc65e23ac Mon Sep 17 00:00:00 2001 From: "anouar.hassine" Date: Wed, 27 Nov 2019 16:44:29 +0100 Subject: [PATCH 1/2] Adding filter to GetSnapshot --- ReactiveXComponent/Connection/IXCPublisher.cs | 4 +- .../RabbitMq/RabbitMqPublisher.cs | 8 ++-- .../RabbitMq/RabbitMqSnapshotManager.cs | 11 ++--- ReactiveXComponent/ReactiveXComponent.csproj | 1 - .../WebSocket/WebSocketPublisher.cs | 8 ++-- .../WebSocket/WebSocketSnapshotManager.cs | 12 +++--- .../WebSocket/WebSocketSnapshotMessage.cs | 4 +- .../Configuration/ConfigurationTests.cs | 2 +- .../RabbitMq/RabbitMqTests.cs | 2 +- ReactiveXComponentTest/RabbitMqTestApi.xcApi | 2 +- .../WebSocket/WebSocketTests.cs | 2 +- .../CreateInstances/Program.cs | 43 +++++++++++++++++-- 12 files changed, 68 insertions(+), 31 deletions(-) diff --git a/ReactiveXComponent/Connection/IXCPublisher.cs b/ReactiveXComponent/Connection/IXCPublisher.cs index 15e486b..dbf0566 100644 --- a/ReactiveXComponent/Connection/IXCPublisher.cs +++ b/ReactiveXComponent/Connection/IXCPublisher.cs @@ -10,7 +10,7 @@ public interface IXCPublisher : IDisposable void SendEvent(string stateMachine, object message, string messageType, Visibility visibility = Visibility.Public); void SendEvent(string stateMachine, object message, Visibility visibility = Visibility.Public); void SendEvent(StateMachineRefHeader stateMachineRefHeader, object message, Visibility visibility = Visibility.Public); - List GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000); - Task> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000); + List GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000); + Task> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000); } } \ No newline at end of file diff --git a/ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs b/ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs index 00fd9b8..9a34ed9 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs @@ -70,14 +70,14 @@ public void SendEvent(StateMachineRefHeader stateMachineRefHeader, object messag Send(message, routingKey, prop); } - public List GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000) + public List GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000) { - return _rabbitMqSnapshotManager.GetSnapshot(stateMachine, chunkSize, timeout); + return _rabbitMqSnapshotManager.GetSnapshot(stateMachine, filter, chunkSize, timeout); } - public Task> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000) + public Task> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000) { - return _rabbitMqSnapshotManager.GetSnapshotAsync(stateMachine, chunkSize, timeout); + return _rabbitMqSnapshotManager.GetSnapshotAsync(stateMachine, filter, chunkSize, timeout); } #endregion diff --git a/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs b/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs index eb260d4..b7eb9a9 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs @@ -73,7 +73,7 @@ private static SnapshotResponse AggregateChunks(ConcurrentBag GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000) + public List GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000) { var guid = Guid.NewGuid(); var requestId = guid.ToString(); @@ -119,7 +119,7 @@ public List GetSnapshot(string stateMachine, int? chunkSize = SnapshotReceived += snapshotListenerOnMessageReceived; SubscribeSnapshot(stateMachine, requestId); - SendSnapshotRequest(stateMachine, requestId, chunkSize, _privateCommunicationIdentifier); + SendSnapshotRequest(stateMachine, requestId, filter, chunkSize, _privateCommunicationIdentifier); if (receivedSnapshotChunksInitialized.WaitOne(timeout)) { @@ -183,12 +183,12 @@ public List GetSnapshot(string stateMachine, int? chunkSize = return result; } - public Task> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000) + public Task> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000) { - return Task.Run(() => GetSnapshot(stateMachine, chunkSize, timeout)); + return Task.Run(() => GetSnapshot(stateMachine, filter, chunkSize, timeout)); } - private void SendSnapshotRequest(string stateMachine, string replyTopic, int? chunkSize, string privateCommunicationIdentifier = null) + private void SendSnapshotRequest(string stateMachine, string replyTopic, string filter = null, int? chunkSize = null, string privateCommunicationIdentifier = null) { if (_xcConfiguration == null) return; @@ -212,6 +212,7 @@ private void SendSnapshotRequest(string stateMachine, string replyTopic, int? ch CallerPrivateTopic = !string.IsNullOrEmpty(privateCommunicationIdentifier) ? new List{ privateCommunicationIdentifier} : null, + Filter = filter, ChunkSize = chunkSize }; diff --git a/ReactiveXComponent/ReactiveXComponent.csproj b/ReactiveXComponent/ReactiveXComponent.csproj index acfa69c..cc60678 100644 --- a/ReactiveXComponent/ReactiveXComponent.csproj +++ b/ReactiveXComponent/ReactiveXComponent.csproj @@ -34,7 +34,6 @@ $(MSBuildThisFileDirectory)$(OutputPath) - diff --git a/ReactiveXComponent/WebSocket/WebSocketPublisher.cs b/ReactiveXComponent/WebSocket/WebSocketPublisher.cs index 0f50bdf..d62f261 100644 --- a/ReactiveXComponent/WebSocket/WebSocketPublisher.cs +++ b/ReactiveXComponent/WebSocket/WebSocketPublisher.cs @@ -65,14 +65,14 @@ public void SendEvent(StateMachineRefHeader stateMachineRefHeader, object messag _webSocketClient.Send(webSocketRequest); } - public List GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000) + public List GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000) { - return _webSocketSnapshotManager.GetSnapshot(stateMachine, chunkSize, timeout); + return _webSocketSnapshotManager.GetSnapshot(stateMachine, filter, chunkSize, timeout); } - public Task> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000) + public Task> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000) { - return _webSocketSnapshotManager.GetSnapshotAsync(stateMachine, chunkSize, timeout); + return _webSocketSnapshotManager.GetSnapshotAsync(stateMachine, filter, chunkSize, timeout); } #endregion diff --git a/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs b/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs index 21432ff..75da4ae 100644 --- a/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs +++ b/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs @@ -43,7 +43,7 @@ public WebSocketSnapshotManager(string component, IWebSocketClient webSocketClie h => SnapshotReceived -= h); } - public List GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000) + public List GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000) { var replyTopic = Guid.NewGuid().ToString(); @@ -74,7 +74,7 @@ public List GetSnapshot(string stateMachine, int? chunkSize = using (_snapshotStream.Subscribe(observer)) { SendWebSocketSnapshotSubscriptionResquest(replyTopic); - SendWebSocketSnapshotRequest(stateMachine, replyTopic, chunkSize); + SendWebSocketSnapshotRequest(stateMachine, replyTopic, filter, chunkSize); lockEvent.WaitOne(timeout); } @@ -84,12 +84,12 @@ public List GetSnapshot(string stateMachine, int? chunkSize = return result; } - public Task> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000) + public Task> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000) { - return Task.Run(() => GetSnapshot(stateMachine, chunkSize, timeout)); + return Task.Run(() => GetSnapshot(stateMachine, filter, chunkSize, timeout)); } - private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic, int? chunkSize) + private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic, string filter = null, int? chunkSize = null) { if (!_webSocketClient.IsOpen) return; @@ -101,7 +101,7 @@ private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic ComponentCode = componentCode, StateMachineCode = stateMachineCode }; - var snapshotMessage = new WebSocketSnapshotMessage(stateMachineCode, componentCode, replyTopic, _privateCommunicationIdentifier, chunkSize); + var snapshotMessage = new WebSocketSnapshotMessage(stateMachineCode, componentCode, replyTopic, _privateCommunicationIdentifier, chunkSize, filter); var webSocketRequest = WebSocketMessageHelper.SerializeRequest( WebSocketCommand.Snapshot, inputHeader, diff --git a/ReactiveXComponent/WebSocket/WebSocketSnapshotMessage.cs b/ReactiveXComponent/WebSocket/WebSocketSnapshotMessage.cs index f3559df..ff732e7 100644 --- a/ReactiveXComponent/WebSocket/WebSocketSnapshotMessage.cs +++ b/ReactiveXComponent/WebSocket/WebSocketSnapshotMessage.cs @@ -10,10 +10,11 @@ public class WebSocketSnapshotMessage public string ReplyTopic { get; set; } public string[] CallerPrivateTopic { get; set; } public int? ChunkSize { get; set; } + public string Filter { get; set; } public WebSocketSnapshotMessage() { } - public WebSocketSnapshotMessage(long stateMachineCode, long componentCode, string replyTopic, string callerPrivateTopic, int? chunkSize) + public WebSocketSnapshotMessage(long stateMachineCode, long componentCode, string replyTopic, string callerPrivateTopic, int? chunkSize, string filter = null) { StateMachineCode = stateMachineCode; ComponentCode = componentCode; @@ -23,6 +24,7 @@ public WebSocketSnapshotMessage(long stateMachineCode, long componentCode, strin CallerPrivateTopic = new []{ callerPrivateTopic }; } ChunkSize = chunkSize; + Filter = filter; } } } diff --git a/ReactiveXComponentTest/Configuration/ConfigurationTests.cs b/ReactiveXComponentTest/Configuration/ConfigurationTests.cs index 7f10ad4..b9acec5 100644 --- a/ReactiveXComponentTest/Configuration/ConfigurationTests.cs +++ b/ReactiveXComponentTest/Configuration/ConfigurationTests.cs @@ -83,7 +83,7 @@ public void GetBusDetailsTest() Check.That(busDetails.SslServerName).IsEqualTo("XComponent RMq"); Check.That(busDetails.SslCertificatePath).IsEqualTo("some_cert_path"); Check.That(busDetails.SslCertificatePassphrase).IsEqualTo("some_cert_pass"); - Check.That(busDetails.SslProtocol).IsEqualTo(SslProtocols.Default); + Check.That(busDetails.SslProtocol).IsEqualTo(SslProtocols.Tls12); Check.That(busDetails.SslAllowUntrustedServerCertificate).IsTrue(); } diff --git a/ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs b/ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs index 125b1ad..b3a4a2d 100644 --- a/ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs +++ b/ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs @@ -403,7 +403,7 @@ public void SnapshotTest(int instancesCount, int chunkSize) List snapshotInstances = null; Task.Run(async () => { - snapshotInstances = await publisher.GetSnapshotAsync(StateMachineA, chunkSize); + snapshotInstances = await publisher.GetSnapshotAsync(StateMachineA, chunkSize: chunkSize); }).GetAwaiter().OnCompleted(() => { snapshotReceivedEvent.Set(); diff --git a/ReactiveXComponentTest/RabbitMqTestApi.xcApi b/ReactiveXComponentTest/RabbitMqTestApi.xcApi index cf273da..5b4f548 100644 --- a/ReactiveXComponentTest/RabbitMqTestApi.xcApi +++ b/ReactiveXComponentTest/RabbitMqTestApi.xcApi @@ -3,7 +3,7 @@ Binary - + diff --git a/ReactiveXComponentTest/WebSocket/WebSocketTests.cs b/ReactiveXComponentTest/WebSocket/WebSocketTests.cs index e588c4d..a81b14b 100644 --- a/ReactiveXComponentTest/WebSocket/WebSocketTests.cs +++ b/ReactiveXComponentTest/WebSocket/WebSocketTests.cs @@ -305,7 +305,7 @@ public void SnapshotTest() List instances = new List(); Task.Run(async () => { - instances = await webSocketPublisher.GetSnapshotAsync(stateMachineName, 2); + instances = await webSocketPublisher.GetSnapshotAsync(stateMachineName, chunkSize:2); }).GetAwaiter().OnCompleted(() => { snapshotReceivedEvent.Set(); diff --git a/docker/integration_tests/XCProjects/HelloWorldV5/CreateInstancesReactiveApi/CreateInstances/Program.cs b/docker/integration_tests/XCProjects/HelloWorldV5/CreateInstancesReactiveApi/CreateInstances/Program.cs index 23013ee..fd8cdd2 100644 --- a/docker/integration_tests/XCProjects/HelloWorldV5/CreateInstancesReactiveApi/CreateInstances/Program.cs +++ b/docker/integration_tests/XCProjects/HelloWorldV5/CreateInstancesReactiveApi/CreateInstances/Program.cs @@ -28,18 +28,25 @@ class ProgramInBatchWithReactiveApi static int Main() { - for (int i = 0; i < MaxNumberOfTries; i++) + var triesCount = 0; + + while (triesCount < MaxNumberOfTries) { try { + triesCount++; int result = TryToRunTest(); return result; } catch (Exception e) { - throw new ReactiveXComponentException("Error while running test: " + e.Message, e); + if (triesCount == MaxNumberOfTries) + { + throw new ReactiveXComponentException("Error while running test: " + e.Message, e); + } } } + Console.WriteLine("Exited test, failed to run"); return TooManyTriesExitCode; } @@ -60,6 +67,8 @@ private static int TryToRunTest() { TestCreateInstanceReactiveApi(publisher, createdInstanceIds, subscriber); + TestGetSnapshotWithFilter(publisher, createdInstanceIds); + TestDestroyInstanceReactiveApi(subscriber, createdInstanceIds, publisher); return 0; @@ -94,7 +103,7 @@ private static void TestDestroyInstanceReactiveApi(IXCSubscriber subscriber, Con List instanceList = createdInstanceIds.Values.ToList(); foreach (var helloWorldResponseInstance in instanceList) { - publisher.SendEvent(helloWorldResponseInstance.StateMachineRefHeader, new SayGoodbye() {Name = "toto"}); + publisher.SendEvent(helloWorldResponseInstance.StateMachineRefHeader, new SayGoodbye()); } if (!waitForInstanceDestruction.WaitOne(TimeToWaitForInstanceState)) @@ -133,7 +142,7 @@ private static void TestCreateInstanceReactiveApi(IXCPublisher publisher, Concur { for (int i = 0; i < NumberOfInstances; i++) { - publisher.SendEvent(entryPointSnapshot.StateMachineRefHeader, new SayHello() {Name = "toto"}); + publisher.SendEvent(entryPointSnapshot.StateMachineRefHeader, new SayHello() {Name = $"toto{i}"}); } if (!waitForInstanceCreation.WaitOne(TimeToWaitForInstanceState)) @@ -144,6 +153,32 @@ private static void TestCreateInstanceReactiveApi(IXCPublisher publisher, Concur } } + private static void TestGetSnapshotWithFilter( + IXCPublisher publisher, + ConcurrentDictionary createdInstanceIds) + { + Console.WriteLine("Testing get snapshot with filter"); + + for (int i = 0; i < NumberOfInstances; i++) + { + var filter = $"OriginatorName == toto{i}"; + var instances = publisher.GetSnapshot(ResponseStateMachineName, filter); + + if (instances == null) + { + Console.WriteLine($"Received a null snapshot for instance with filter = {filter}"); + throw new Exception("Error while testing snapshot"); + } + + if (instances.Count != 1) + { + Console.WriteLine($"Received {instances.Count} instances for get snapshot for instance with filter = {filter}. Expected only one."); + throw new Exception("Error while testing snapshot"); + } + } + + } + private static void ObserverStateMachineExecuted(int instanceCount, AutoResetEvent waitForInstanceState, string stateInstance) { Console.WriteLine($"{instanceCount} instance " + stateInstance); From 38b0e8ea19cee6198c7a954cc6a9f291bdd23ca9 Mon Sep 17 00:00:00 2001 From: "anouar.hassine" Date: Wed, 27 Nov 2019 17:02:17 +0100 Subject: [PATCH 2/2] Fixing integration test --- .../CreateInstancesReactiveApi/CreateInstances/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/integration_tests/XCProjects/HelloWorldV5/CreateInstancesReactiveApi/CreateInstances/Program.cs b/docker/integration_tests/XCProjects/HelloWorldV5/CreateInstancesReactiveApi/CreateInstances/Program.cs index fd8cdd2..6644edb 100644 --- a/docker/integration_tests/XCProjects/HelloWorldV5/CreateInstancesReactiveApi/CreateInstances/Program.cs +++ b/docker/integration_tests/XCProjects/HelloWorldV5/CreateInstancesReactiveApi/CreateInstances/Program.cs @@ -161,7 +161,7 @@ private static void TestGetSnapshotWithFilter( for (int i = 0; i < NumberOfInstances; i++) { - var filter = $"OriginatorName == toto{i}"; + var filter = $"OriginatorName == \"toto{i}\""; var instances = publisher.GetSnapshot(ResponseStateMachineName, filter); if (instances == null)