Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ReactiveXComponent/Connection/IXCPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface IXCPublisher : IDisposable
void SendEvent(string stateMachine, object message, string messageType, Visibility visibility = Visibility.Public);
void SendEvent(string stateMachine, object message, Visibility visibility = Visibility.Public);
void SendEvent(StateMachineRefHeader stateMachineRefHeader, object message, Visibility visibility = Visibility.Public);
List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000);
Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000);
List<MessageEventArgs> GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000);
Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000);
}
}
8 changes: 4 additions & 4 deletions ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ public void SendEvent(StateMachineRefHeader stateMachineRefHeader, object messag
Send(message, routingKey, prop);
}

public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000)
public List<MessageEventArgs> GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
{
return _rabbitMqSnapshotManager.GetSnapshot(stateMachine, chunkSize, timeout);
return _rabbitMqSnapshotManager.GetSnapshot(stateMachine, filter, chunkSize, timeout);
}

public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000)
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
{
return _rabbitMqSnapshotManager.GetSnapshotAsync(stateMachine, chunkSize, timeout);
return _rabbitMqSnapshotManager.GetSnapshotAsync(stateMachine, filter, chunkSize, timeout);
}

#endregion
Expand Down
11 changes: 6 additions & 5 deletions ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private static SnapshotResponse AggregateChunks(ConcurrentBag<SnapshotResponseCh
return aggregatedResult;
}

public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000)
public List<MessageEventArgs> GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
{
var guid = Guid.NewGuid();
var requestId = guid.ToString();
Expand Down Expand Up @@ -119,7 +119,7 @@ public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize =
SnapshotReceived += snapshotListenerOnMessageReceived;

SubscribeSnapshot(stateMachine, requestId);
SendSnapshotRequest(stateMachine, requestId, chunkSize, _privateCommunicationIdentifier);
SendSnapshotRequest(stateMachine, requestId, filter, chunkSize, _privateCommunicationIdentifier);

if (receivedSnapshotChunksInitialized.WaitOne(timeout))
{
Expand Down Expand Up @@ -183,12 +183,12 @@ public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize =
return result;
}

public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000)
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
{
return Task.Run(() => GetSnapshot(stateMachine, chunkSize, timeout));
return Task.Run(() => GetSnapshot(stateMachine, filter, chunkSize, timeout));
}

private void SendSnapshotRequest(string stateMachine, string replyTopic, int? chunkSize, string privateCommunicationIdentifier = null)
private void SendSnapshotRequest(string stateMachine, string replyTopic, string filter = null, int? chunkSize = null, string privateCommunicationIdentifier = null)
{
if (_xcConfiguration == null)
return;
Expand All @@ -212,6 +212,7 @@ private void SendSnapshotRequest(string stateMachine, string replyTopic, int? ch
CallerPrivateTopic = !string.IsNullOrEmpty(privateCommunicationIdentifier)
? new List<string>{ privateCommunicationIdentifier}
: null,
Filter = filter,
ChunkSize = chunkSize
};

Expand Down
1 change: 0 additions & 1 deletion ReactiveXComponent/ReactiveXComponent.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
<WorkingDirectory>$(MSBuildThisFileDirectory)$(OutputPath)</WorkingDirectory>
</PropertyGroup>
<ItemGroup>
<InputAssemblies Include="$(WorkingDirectory)\ReactiveXComponent.dll" />
<InputAssemblies Include="$(WorkingDirectory)\Newtonsoft.Json.dll" />
<InputAssemblies Include="$(WorkingDirectory)\RabbitMQ.Client.dll" />
<InputAssemblies Include="$(WorkingDirectory)\WebSocket4Net.dll" />
Expand Down
8 changes: 4 additions & 4 deletions ReactiveXComponent/WebSocket/WebSocketPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ public void SendEvent(StateMachineRefHeader stateMachineRefHeader, object messag
_webSocketClient.Send(webSocketRequest);
}

public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000)
public List<MessageEventArgs> GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
{
return _webSocketSnapshotManager.GetSnapshot(stateMachine, chunkSize, timeout);
return _webSocketSnapshotManager.GetSnapshot(stateMachine, filter, chunkSize, timeout);
}

public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000)
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
{
return _webSocketSnapshotManager.GetSnapshotAsync(stateMachine, chunkSize, timeout);
return _webSocketSnapshotManager.GetSnapshotAsync(stateMachine, filter, chunkSize, timeout);
}

#endregion
Expand Down
12 changes: 6 additions & 6 deletions ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public WebSocketSnapshotManager(string component, IWebSocketClient webSocketClie
h => SnapshotReceived -= h);
}

public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000)
public List<MessageEventArgs> GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
{
var replyTopic = Guid.NewGuid().ToString();

Expand Down Expand Up @@ -74,7 +74,7 @@ public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize =
using (_snapshotStream.Subscribe(observer))
{
SendWebSocketSnapshotSubscriptionResquest(replyTopic);
SendWebSocketSnapshotRequest(stateMachine, replyTopic, chunkSize);
SendWebSocketSnapshotRequest(stateMachine, replyTopic, filter, chunkSize);
lockEvent.WaitOne(timeout);
}

Expand All @@ -84,12 +84,12 @@ public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize =
return result;
}

public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000)
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
{
return Task.Run(() => GetSnapshot(stateMachine, chunkSize, timeout));
return Task.Run(() => GetSnapshot(stateMachine, filter, chunkSize, timeout));
}

private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic, int? chunkSize)
private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic, string filter = null, int? chunkSize = null)
{
if (!_webSocketClient.IsOpen) return;

Expand All @@ -101,7 +101,7 @@ private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic
ComponentCode = componentCode,
StateMachineCode = stateMachineCode
};
var snapshotMessage = new WebSocketSnapshotMessage(stateMachineCode, componentCode, replyTopic, _privateCommunicationIdentifier, chunkSize);
var snapshotMessage = new WebSocketSnapshotMessage(stateMachineCode, componentCode, replyTopic, _privateCommunicationIdentifier, chunkSize, filter);
var webSocketRequest = WebSocketMessageHelper.SerializeRequest(
WebSocketCommand.Snapshot,
inputHeader,
Expand Down
4 changes: 3 additions & 1 deletion ReactiveXComponent/WebSocket/WebSocketSnapshotMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ public class WebSocketSnapshotMessage
public string ReplyTopic { get; set; }
public string[] CallerPrivateTopic { get; set; }
public int? ChunkSize { get; set; }
public string Filter { get; set; }

public WebSocketSnapshotMessage() { }

public WebSocketSnapshotMessage(long stateMachineCode, long componentCode, string replyTopic, string callerPrivateTopic, int? chunkSize)
public WebSocketSnapshotMessage(long stateMachineCode, long componentCode, string replyTopic, string callerPrivateTopic, int? chunkSize, string filter = null)
{
StateMachineCode = stateMachineCode;
ComponentCode = componentCode;
Expand All @@ -23,6 +24,7 @@ public WebSocketSnapshotMessage(long stateMachineCode, long componentCode, strin
CallerPrivateTopic = new []{ callerPrivateTopic };
}
ChunkSize = chunkSize;
Filter = filter;
}
}
}
2 changes: 1 addition & 1 deletion ReactiveXComponentTest/Configuration/ConfigurationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void GetBusDetailsTest()
Check.That(busDetails.SslServerName).IsEqualTo("XComponent RMq");
Check.That(busDetails.SslCertificatePath).IsEqualTo("some_cert_path");
Check.That(busDetails.SslCertificatePassphrase).IsEqualTo("some_cert_pass");
Check.That(busDetails.SslProtocol).IsEqualTo(SslProtocols.Default);
Check.That(busDetails.SslProtocol).IsEqualTo(SslProtocols.Tls12);
Check.That(busDetails.SslAllowUntrustedServerCertificate).IsTrue();
}

Expand Down
2 changes: 1 addition & 1 deletion ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public void SnapshotTest(int instancesCount, int chunkSize)
List<MessageEventArgs> snapshotInstances = null;
Task.Run(async () =>
{
snapshotInstances = await publisher.GetSnapshotAsync(StateMachineA, chunkSize);
snapshotInstances = await publisher.GetSnapshotAsync(StateMachineA, chunkSize: chunkSize);
}).GetAwaiter().OnCompleted(() =>
{
snapshotReceivedEvent.Set();
Expand Down
2 changes: 1 addition & 1 deletion ReactiveXComponentTest/RabbitMqTestApi.xcApi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<threading />
<serialization>Binary</serialization>
<communication>
<bus name="rabbitmq" host="127.0.0.1" virtualHost="myVirtualHost" port="5671" user="guest" password="guest" type="RABBIT_MQ" sslEnabled="True" sslServerName="XComponent RMq" sslCertPath="some_cert_path" sslCertPassphrase="some_cert_pass" sslProtocol="Default" sslAllowUntrustedServerCertificate="True" />
<bus name="rabbitmq" host="127.0.0.1" virtualHost="myVirtualHost" port="5671" user="guest" password="guest" type="RABBIT_MQ" sslEnabled="True" sslServerName="XComponent RMq" sslCertPath="some_cert_path" sslCertPassphrase="some_cert_pass" sslProtocol="Tls12" sslAllowUntrustedServerCertificate="True" />
</communication>
<clientAPICommunication>
<publish componentCode="-69981087" stateMachineCode="-829536631" eventType="UPDATE" topicType="output" communicationType="BUS" stateCode="0" eventCode="9" event="XComponent.HelloWorld.UserObject.SayHello" communication="rabbitmq">
Expand Down
2 changes: 1 addition & 1 deletion ReactiveXComponentTest/WebSocket/WebSocketTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void SnapshotTest()
List<MessageEventArgs> instances = new List<MessageEventArgs>();
Task.Run(async () =>
{
instances = await webSocketPublisher.GetSnapshotAsync(stateMachineName, 2);
instances = await webSocketPublisher.GetSnapshotAsync(stateMachineName, chunkSize:2);
}).GetAwaiter().OnCompleted(() =>
{
snapshotReceivedEvent.Set();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,25 @@ class ProgramInBatchWithReactiveApi

static int Main()
{
for (int i = 0; i < MaxNumberOfTries; i++)
var triesCount = 0;

while (triesCount < MaxNumberOfTries)
{
try
{
triesCount++;
int result = TryToRunTest();
return result;
}
catch (Exception e)
{
throw new ReactiveXComponentException("Error while running test: " + e.Message, e);
if (triesCount == MaxNumberOfTries)
{
throw new ReactiveXComponentException("Error while running test: " + e.Message, e);
}
}
}

Console.WriteLine("Exited test, failed to run");
return TooManyTriesExitCode;
}
Expand All @@ -60,6 +67,8 @@ private static int TryToRunTest()
{
TestCreateInstanceReactiveApi(publisher, createdInstanceIds, subscriber);

TestGetSnapshotWithFilter(publisher, createdInstanceIds);

TestDestroyInstanceReactiveApi(subscriber, createdInstanceIds, publisher);

return 0;
Expand Down Expand Up @@ -94,7 +103,7 @@ private static void TestDestroyInstanceReactiveApi(IXCSubscriber subscriber, Con
List<MessageEventArgs> instanceList = createdInstanceIds.Values.ToList();
foreach (var helloWorldResponseInstance in instanceList)
{
publisher.SendEvent(helloWorldResponseInstance.StateMachineRefHeader, new SayGoodbye() {Name = "toto"});
publisher.SendEvent(helloWorldResponseInstance.StateMachineRefHeader, new SayGoodbye());
}

if (!waitForInstanceDestruction.WaitOne(TimeToWaitForInstanceState))
Expand Down Expand Up @@ -133,7 +142,7 @@ private static void TestCreateInstanceReactiveApi(IXCPublisher publisher, Concur
{
for (int i = 0; i < NumberOfInstances; i++)
{
publisher.SendEvent(entryPointSnapshot.StateMachineRefHeader, new SayHello() {Name = "toto"});
publisher.SendEvent(entryPointSnapshot.StateMachineRefHeader, new SayHello() {Name = $"toto{i}"});
}

if (!waitForInstanceCreation.WaitOne(TimeToWaitForInstanceState))
Expand All @@ -144,6 +153,32 @@ private static void TestCreateInstanceReactiveApi(IXCPublisher publisher, Concur
}
}

private static void TestGetSnapshotWithFilter(
IXCPublisher publisher,
ConcurrentDictionary<string, MessageEventArgs> createdInstanceIds)
{
Console.WriteLine("Testing get snapshot with filter");

for (int i = 0; i < NumberOfInstances; i++)
{
var filter = $"OriginatorName == \"toto{i}\"";
var instances = publisher.GetSnapshot(ResponseStateMachineName, filter);

if (instances == null)
{
Console.WriteLine($"Received a null snapshot for instance with filter = {filter}");
throw new Exception("Error while testing snapshot");
}

if (instances.Count != 1)
{
Console.WriteLine($"Received {instances.Count} instances for get snapshot for instance with filter = {filter}. Expected only one.");
throw new Exception("Error while testing snapshot");
}
}

}

private static void ObserverStateMachineExecuted(int instanceCount, AutoResetEvent waitForInstanceState, string stateInstance)
{
Console.WriteLine($"{instanceCount} instance " + stateInstance);
Expand Down