From 9ca8967a82fb2a00ce45452dfe8afe7bcf682548 Mon Sep 17 00:00:00 2001 From: Andrei Marculescu Date: Tue, 24 Jul 2018 17:29:21 +0200 Subject: [PATCH 1/4] XCOMP-1177: migrating to string state machine ids in order to support xcomponent versions > 5.0 --- ReactiveXComponent/Common/StateMachineInstance.cs | 2 +- ReactiveXComponent/Common/StateMachineRefHeader.cs | 4 ++-- ReactiveXComponent/Common/WebSocketEngineHeader.cs | 2 +- ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs | 8 ++++---- ReactiveXComponent/WebSocket/WebSocketSubscriber.cs | 2 +- ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs | 8 ++++---- ReactiveXComponentTest/WebSocket/WebSocketTests.cs | 6 +++--- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/ReactiveXComponent/Common/StateMachineInstance.cs b/ReactiveXComponent/Common/StateMachineInstance.cs index 9db70dc..539464a 100644 --- a/ReactiveXComponent/Common/StateMachineInstance.cs +++ b/ReactiveXComponent/Common/StateMachineInstance.cs @@ -5,7 +5,7 @@ namespace ReactiveXComponent.Common [Serializable] public class StateMachineInstance { - public long StateMachineId { get; set; } + public string StateMachineId { get; set; } public int AgentId { get; set; } diff --git a/ReactiveXComponent/Common/StateMachineRefHeader.cs b/ReactiveXComponent/Common/StateMachineRefHeader.cs index 05a2179..b39bb8d 100644 --- a/ReactiveXComponent/Common/StateMachineRefHeader.cs +++ b/ReactiveXComponent/Common/StateMachineRefHeader.cs @@ -20,7 +20,7 @@ public override int GetHashCode() { unchecked { - var hashCode = StateMachineId.GetHashCode(); + var hashCode = (StateMachineId != null ? StateMachineId.GetHashCode() : 0); hashCode = (hashCode * 397) ^ StateCode; hashCode = (hashCode * 397) ^ StateMachineCode; hashCode = (hashCode * 397) ^ ComponentCode; @@ -32,7 +32,7 @@ public override int GetHashCode() } } - public long StateMachineId { get; set; } + public string StateMachineId { get; set; } public int StateCode { get; set; } diff --git a/ReactiveXComponent/Common/WebSocketEngineHeader.cs b/ReactiveXComponent/Common/WebSocketEngineHeader.cs index 90b3dbf..24cb539 100644 --- a/ReactiveXComponent/Common/WebSocketEngineHeader.cs +++ b/ReactiveXComponent/Common/WebSocketEngineHeader.cs @@ -13,7 +13,7 @@ namespace ReactiveXComponent.Common /// public class WebSocketEngineHeader { - public long? StateMachineId { get; set; } + public string StateMachineId { get; set; } public int? StateMachineCode { get; set; } diff --git a/ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs b/ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs index 780dcd5..d592bb2 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs @@ -32,7 +32,7 @@ public static Dictionary CreateHeaderFromStateMachineRefHeader(S var dico = new Dictionary { - {HeaderElement.StateMachineId, stateMachineRefHeader?.StateMachineId}, + {HeaderElement.StateMachineId, stateMachineRefHeader?.StateMachineId != null ? Encoding.UTF8.GetBytes(stateMachineRefHeader.StateMachineId) : encoding.GetBytes(string.Empty)}, {HeaderElement.StateCode, stateMachineRefHeader?.StateCode}, {HeaderElement.StateMachineCode, stateMachineRefHeader?.StateMachineCode}, {HeaderElement.ComponentCode, stateMachineRefHeader?.ComponentCode}, @@ -48,7 +48,7 @@ public static Dictionary CreateHeaderFromStateMachineRefHeader(S public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary stateMachineRefHeader) { var encoding = new UTF8Encoding(); - var stateMachineId = -1; + string stateMachineId = null; var stateCode = -1; var stateMachineCode = -1; var componentCode = -1; @@ -57,8 +57,8 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary Date: Fri, 27 Jul 2018 18:00:30 +0200 Subject: [PATCH 2/4] Adding support for chunks in snapshots --- ReactiveXComponent/Common/ChunkCountdown.cs | 40 ++++ .../Common/ChunkedSnapshotEvent.cs | 11 + .../GZipSnapshotItemArrayJsonConverter.cs | 62 ++++++ ReactiveXComponent/Common/SnapshotEvent.cs | 12 ++ ReactiveXComponent/Common/SnapshotItem.cs | 36 ++++ ReactiveXComponent/Common/SnapshotItems.cs | 11 - ReactiveXComponent/Common/SnapshotMessage.cs | 1 + ReactiveXComponent/Common/SnapshotResponse.cs | 19 ++ .../Common/SnapshotResponseChunk.cs | 28 +++ ReactiveXComponent/Connection/IXCPublisher.cs | 6 +- .../RabbitMq/RabbitMqPublisher.cs | 9 +- .../RabbitMq/RabbitMqSnapshotManager.cs | 197 ++++++++++++------ ReactiveXComponent/ReactiveXComponent.csproj | 8 +- .../WebSocket/WebSocketClient.cs | 2 + .../WebSocket/WebSocketMessageHelper.cs | 19 +- .../WebSocket/WebSocketPublisher.cs | 9 +- .../WebSocket/WebSocketSnapshotManager.cs | 65 +++--- .../WebSocket/WebSocketSnapshotMessage.cs | 4 +- .../RabbitMq/RabbitMqTests.cs | 131 ++++++++---- .../WebSocket/WebSocketTests.cs | 19 +- 20 files changed, 494 insertions(+), 195 deletions(-) create mode 100644 ReactiveXComponent/Common/ChunkCountdown.cs create mode 100644 ReactiveXComponent/Common/ChunkedSnapshotEvent.cs create mode 100644 ReactiveXComponent/Common/GZipSnapshotItemArrayJsonConverter.cs create mode 100644 ReactiveXComponent/Common/SnapshotEvent.cs create mode 100644 ReactiveXComponent/Common/SnapshotItem.cs delete mode 100644 ReactiveXComponent/Common/SnapshotItems.cs create mode 100644 ReactiveXComponent/Common/SnapshotResponse.cs create mode 100644 ReactiveXComponent/Common/SnapshotResponseChunk.cs diff --git a/ReactiveXComponent/Common/ChunkCountdown.cs b/ReactiveXComponent/Common/ChunkCountdown.cs new file mode 100644 index 0000000..f195f2a --- /dev/null +++ b/ReactiveXComponent/Common/ChunkCountdown.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace ReactiveXComponent.Common +{ + public class ChunkCountdown : IDisposable + { + private readonly ManualResetEvent _completionResetEvent = new System.Threading.ManualResetEvent(false); + + private const int ValueNotSet = int.MinValue; + private int _countdown = ValueNotSet; + + public void SetValueIfNotInitialized(int countdown) + { + Interlocked.CompareExchange(ref _countdown, countdown, ValueNotSet); + } + + public void Decrement() + { + bool isCompleted = Interlocked.Decrement(ref _countdown) <= 0; + if (isCompleted) + { + _completionResetEvent.Set(); + } + } + + public WaitHandle CompletionResetEvent => _completionResetEvent; + + public int Countdown => _countdown; + + public void Dispose() + { + _completionResetEvent?.Dispose(); + } + } +} diff --git a/ReactiveXComponent/Common/ChunkedSnapshotEvent.cs b/ReactiveXComponent/Common/ChunkedSnapshotEvent.cs new file mode 100644 index 0000000..bb5cd88 --- /dev/null +++ b/ReactiveXComponent/Common/ChunkedSnapshotEvent.cs @@ -0,0 +1,11 @@ +using System; + +namespace ReactiveXComponent.Common +{ + public class ChunkedSnapshotEvent + { + public string RequestId { get; set; } + + public SnapshotResponseChunk SnapshotResponseChunk { get; set; } + } +} diff --git a/ReactiveXComponent/Common/GZipSnapshotItemArrayJsonConverter.cs b/ReactiveXComponent/Common/GZipSnapshotItemArrayJsonConverter.cs new file mode 100644 index 0000000..6e3442d --- /dev/null +++ b/ReactiveXComponent/Common/GZipSnapshotItemArrayJsonConverter.cs @@ -0,0 +1,62 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.IO.Compression; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Newtonsoft.Json; + +namespace ReactiveXComponent.Common +{ + public class GZipSnapshotItemArrayJsonConverter : JsonConverter + { + public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) + { + var stringWriter = new StringWriter(new StringBuilder(256), CultureInfo.InvariantCulture); + using (var jsonTextWriter = new JsonTextWriter(stringWriter)) + { + jsonTextWriter.Formatting = serializer.Formatting; + serializer.Serialize(jsonTextWriter, value); + var input = Encoding.UTF8.GetBytes(stringWriter.ToString()); + using (var ms = new MemoryStream()) + { + using (var compressedStream = new GZipStream(ms, CompressionMode.Compress)) + { + compressedStream.Write(input, 0, input.Length); + compressedStream.Close(); + } + var compressedBase64 = Convert.ToBase64String(ms.ToArray()); + serializer.Serialize(writer, compressedBase64, typeof(string)); + } + } + } + + public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + { + var input = reader.Value as string; + if (input != null) + { + var compressedInput = Convert.FromBase64String(input); + using (var ms = new MemoryStream(compressedInput)) + { + using (var decompressedStream = new GZipStream(ms, CompressionMode.Decompress)) + { + using (var streamReader = new StreamReader(decompressedStream)) + { + return serializer.Deserialize(streamReader, typeof(T)); + } + } + } + } + + return null; + } + + public override bool CanConvert(Type objectType) + { + return typeof(T) == objectType; + } + } +} diff --git a/ReactiveXComponent/Common/SnapshotEvent.cs b/ReactiveXComponent/Common/SnapshotEvent.cs new file mode 100644 index 0000000..32cd005 --- /dev/null +++ b/ReactiveXComponent/Common/SnapshotEvent.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; + +namespace ReactiveXComponent.Common +{ + public class SnapshotEvent + { + public string RequestId { get; set; } + + public SnapshotResponse SnapshotResponse { get; set; } + } +} diff --git a/ReactiveXComponent/Common/SnapshotItem.cs b/ReactiveXComponent/Common/SnapshotItem.cs new file mode 100644 index 0000000..e2e882f --- /dev/null +++ b/ReactiveXComponent/Common/SnapshotItem.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ReactiveXComponent.Common +{ + public class SnapshotItem + { + public const int DefaultIntValue = -1; + public const string DefaultStringValue = ""; + + public string StateMachineId { get; set; } + + public int WorkerId { get; set; } + + public int StateMachineCode { get; set; } + + public int StateCode { get; set; } + + public int ComponentCode { get; set; } + + public object PublicMember { get; set; } + + public SnapshotItem() + { + StateMachineId = DefaultStringValue; + WorkerId = DefaultIntValue; + StateMachineCode = DefaultIntValue; + StateCode = DefaultIntValue; + ComponentCode = DefaultIntValue; + PublicMember = null; + } + } +} diff --git a/ReactiveXComponent/Common/SnapshotItems.cs b/ReactiveXComponent/Common/SnapshotItems.cs deleted file mode 100644 index 05d4418..0000000 --- a/ReactiveXComponent/Common/SnapshotItems.cs +++ /dev/null @@ -1,11 +0,0 @@ - -using System; - -namespace ReactiveXComponent.Common -{ - [Serializable] - public class SnapshotItems - { - public string Items; - } -} diff --git a/ReactiveXComponent/Common/SnapshotMessage.cs b/ReactiveXComponent/Common/SnapshotMessage.cs index b44766e..4470d43 100644 --- a/ReactiveXComponent/Common/SnapshotMessage.cs +++ b/ReactiveXComponent/Common/SnapshotMessage.cs @@ -13,5 +13,6 @@ public class SnapshotMessage public TimeSpan? Timeout { get; set; } = DefaultTimeout; public List CallerPrivateTopic { get; set; } public string ReplyTopic { get; set; } + public int? ChunkSize { get; set; } } } diff --git a/ReactiveXComponent/Common/SnapshotResponse.cs b/ReactiveXComponent/Common/SnapshotResponse.cs new file mode 100644 index 0000000..b6dd4ec --- /dev/null +++ b/ReactiveXComponent/Common/SnapshotResponse.cs @@ -0,0 +1,19 @@ + +using System; +using System.Collections.Generic; +using Newtonsoft.Json; + +namespace ReactiveXComponent.Common +{ + [Serializable] + public class SnapshotResponse + { + [JsonConverter(typeof(GZipSnapshotItemArrayJsonConverter>))] + public List Items { get; set; } + + public SnapshotResponse() + { + Items = new List(); + } + } +} diff --git a/ReactiveXComponent/Common/SnapshotResponseChunk.cs b/ReactiveXComponent/Common/SnapshotResponseChunk.cs new file mode 100644 index 0000000..d1e0f4f --- /dev/null +++ b/ReactiveXComponent/Common/SnapshotResponseChunk.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; + +namespace ReactiveXComponent.Common +{ + public class SnapshotResponseChunk + { + public SnapshotResponseChunk() + { + } + + public SnapshotResponseChunk(string runtimeId, List knownRuntimeIds) + { + RuntimeId = runtimeId; + KnownRuntimeIds = knownRuntimeIds; + } + + public int ChunkCount { get; set; } + + public int ChunkId { get; set; } + + public string RuntimeId { get; set; } + + public List KnownRuntimeIds { get; set; } + + public SnapshotResponse Response { get; set; } + } +} diff --git a/ReactiveXComponent/Connection/IXCPublisher.cs b/ReactiveXComponent/Connection/IXCPublisher.cs index 40bf044..f38e37b 100644 --- a/ReactiveXComponent/Connection/IXCPublisher.cs +++ b/ReactiveXComponent/Connection/IXCPublisher.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using ReactiveXComponent.Common; -using ReactiveXComponent.RabbitMq; namespace ReactiveXComponent.Connection { @@ -10,7 +10,7 @@ public interface IXCPublisher : IDisposable void SendEvent(string stateMachine, object message, string messageType, Visibility visibility = Visibility.Public); void SendEvent(string stateMachine, object message, Visibility visibility = Visibility.Public); void SendEvent(StateMachineRefHeader stateMachineRefHeader, object message, Visibility visibility = Visibility.Public); - List GetSnapshot(string stateMachine, int timeout = 10000); - void GetSnapshotAsync(string stateMachine, Action> onSnapshotReceived); + List GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000); + Task> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000); } } \ No newline at end of file diff --git a/ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs b/ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs index 9b5dc67..4aa156a 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Threading.Tasks; using RabbitMQ.Client; using ReactiveXComponent.Common; using ReactiveXComponent.Configuration; @@ -69,14 +70,14 @@ public void SendEvent(StateMachineRefHeader stateMachineRefHeader, object messag Send(message, routingKey, prop); } - public List GetSnapshot(string stateMachine, int timeout = 10000) + public List GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000) { - return _rabbitMqSnapshotManager.GetSnapshot(stateMachine, timeout); + return _rabbitMqSnapshotManager.GetSnapshot(stateMachine, chunkSize, timeout); } - public void GetSnapshotAsync(string stateMachine, Action> onSnapshotReceived) + public Task> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000) { - _rabbitMqSnapshotManager.GetSnapshotAsync(stateMachine, onSnapshotReceived); + return _rabbitMqSnapshotManager.GetSnapshotAsync(stateMachine, chunkSize, timeout); } #endregion diff --git a/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs b/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs index bff3157..417e44f 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs @@ -3,9 +3,11 @@ using System.Collections.Generic; using System.IO; using System.IO.Compression; +using System.Linq; using System.Reactive.Linq; using System.Text; using System.Threading; +using System.Threading.Tasks; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -29,9 +31,8 @@ public class RabbitMqSnapshotManager : IDisposable private readonly ConcurrentDictionary _subscribers; private IModel _snapshotChannel; - private event EventHandler> SnapshotReceived; + private event EventHandler SnapshotReceived; private event EventHandler ConnectionFailed; - private IObservable> _snapshotStream; public RabbitMqSnapshotManager(IConnection connection, string component, IXCConfiguration configuration, ISerializer serializer, string privateCommunicationIdentifier = null) { @@ -43,7 +44,6 @@ public RabbitMqSnapshotManager(IConnection connection, string component, IXCConf _subscribers = new ConcurrentDictionary(); InitSerializationType(); CreateSnapshotChannel(connection); - InitObservableCollection(); } private void CreateSnapshotChannel(IConnection connection) @@ -56,49 +56,138 @@ private void CreateSnapshotChannel(IConnection connection) _snapshotChannel.ExchangeDeclare(exchangeName, ExchangeType.Topic); } - private void InitObservableCollection() + private static SnapshotResponse AggregateChunks(ConcurrentBag snapshotChunks) { - _snapshotStream = Observable.FromEvent>, List>( - handler => (sender, e) => handler(e), - h => SnapshotReceived += h, - h => SnapshotReceived -= h); + if (snapshotChunks.Count == 0) + { + return null; + } + + var aggregatedResult = new SnapshotResponse(); + + SnapshotResponseChunk chunk; + while (snapshotChunks.TryTake(out chunk)) + { + aggregatedResult.Items.AddRange(chunk.Response.Items); + } + return aggregatedResult; } - public List GetSnapshot(string stateMachine, int timeout = 10000) + public List GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000) { var guid = Guid.NewGuid(); + var requestId = guid.ToString(); List result = null; - var lockEvent = new AutoResetEvent(false); - var handler = new EventHandler>((sender, args) => + + var receivedSnapshotChunksInitialized = new AutoResetEvent(false); + var receivedSnapshotChunks = new ConcurrentDictionary(); + var snapshotChunks = new ConcurrentBag(); + List invalidChunks = new List(); + + try { - result = new List(args); - lockEvent.Set(); - }); - SnapshotReceived += handler; + EventHandler snapshotListenerOnMessageReceived = (sender, args) => + { + if (args.RequestId != requestId) + { + return; + } + + var chunk = args.SnapshotResponseChunk; + if (chunk == null) + { + invalidChunks.Add(args); + } + else + { + snapshotChunks.Add(chunk); + foreach (var runtimeId in chunk.KnownRuntimeIds) + { + receivedSnapshotChunks.TryAdd(runtimeId, new ChunkCountdown()); + } + receivedSnapshotChunksInitialized.Set(); + + ChunkCountdown chunkCountdown; + if (receivedSnapshotChunks.TryGetValue(chunk.RuntimeId, out chunkCountdown)) + { + chunkCountdown.SetValueIfNotInitialized(chunk.ChunkCount); + chunkCountdown.Decrement(); + } + } + }; - SubscribeSnapshot(stateMachine, guid.ToString()); - SendSnapshotRequest(stateMachine, guid, _privateCommunicationIdentifier); + SnapshotReceived += snapshotListenerOnMessageReceived; - lockEvent.WaitOne(timeout); + SubscribeSnapshot(stateMachine, requestId); + SendSnapshotRequest(stateMachine, requestId, chunkSize, _privateCommunicationIdentifier); - SnapshotReceived -= handler; - UnsubscribeSnapshot(stateMachine); + if (receivedSnapshotChunksInitialized.WaitOne(timeout)) + { + var completionEvents = + receivedSnapshotChunks.Values.Select(ccd => ccd.CompletionResetEvent).ToArray(); + if (WaitHandle.WaitAll(completionEvents, timeout)) + { + SnapshotReceived -= snapshotListenerOnMessageReceived; + return AggregateChunks(snapshotChunks).Items.Select(item => + new MessageEventArgs(new StateMachineRefHeader() + { + StateMachineId = item.StateMachineId, + StateMachineCode = item.StateMachineCode, + ComponentCode = item.ComponentCode, + StateCode = item.StateCode + }, + item.PublicMember, + _serializationType)).ToList(); + } + } + + SnapshotReceived -= snapshotListenerOnMessageReceived; + + UnsubscribeSnapshot(stateMachine); + + if (invalidChunks.Count > 0) + { + throw new ReactiveXComponentException($"A number of {invalidChunks.Count} chunks are not of the expected type {nameof(SnapshotResponseChunk)}"); + } + + if (!receivedSnapshotChunks.IsEmpty) + { + var errorMessage = new StringBuilder("The snapshot was incomplete: "); + foreach (var remainingSnapshotChunk in receivedSnapshotChunks) + { + var remainingChunkCount = remainingSnapshotChunk.Value.Countdown == Int32.MinValue + ? "all" + : remainingSnapshotChunk.Value.Countdown.ToString(); + errorMessage.Append( + $" runtime {remainingSnapshotChunk.Key} missing {remainingChunkCount} chunk(s);"); + } + + throw new ReactiveXComponentException(errorMessage.ToString()); + } + } + catch (Exception e) + { + throw new ReactiveXComponentException("Error encoutered while requesting snapshot: " + e.Message, e); + } + finally + { + foreach (var chunkCountdown in receivedSnapshotChunks.Values) + { + chunkCountdown.Dispose(); + } + + receivedSnapshotChunksInitialized.Dispose(); + } return result; } - public void GetSnapshotAsync(string stateMachine, Action> onSnapshotReceived) + public Task> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000) { - var guid = Guid.NewGuid(); - if (onSnapshotReceived != null) - { - _snapshotStream.Subscribe(onSnapshotReceived); - } - SubscribeSnapshot(stateMachine, guid.ToString()); - SendSnapshotRequest(stateMachine, guid, _privateCommunicationIdentifier); + return Task.Run(() => GetSnapshot(stateMachine, chunkSize, timeout)); } - private void SendSnapshotRequest(string stateMachine, Guid guid, string privateCommunicationIdentifier = null) + private void SendSnapshotRequest(string stateMachine, string replyTopic, int? chunkSize, string privateCommunicationIdentifier = null) { if (_xcConfiguration == null) return; @@ -118,10 +207,11 @@ private void SendSnapshotRequest(string stateMachine, Guid guid, string privateC var snapshotMessage = new SnapshotMessage() { Timeout = TimeSpan.FromSeconds(10), - ReplyTopic = guid.ToString(), + ReplyTopic = replyTopic, CallerPrivateTopic = !string.IsNullOrEmpty(privateCommunicationIdentifier) ? new List{ privateCommunicationIdentifier} : null, + ChunkSize = chunkSize }; Send(snapshotMessage, routingKey, prop); @@ -221,7 +311,7 @@ private void ReceiveMessage(SubscriptionKey subscriberKey) rabbitMqSubscriberInfos.Subscriber.Received += (o, e) => { rabbitMqSubscriberInfos.Channel?.BasicAck(e.DeliveryTag, false); - DispatchMessage(e); + DispatchMessage(e, subscriberKey.RoutingKey); }; } catch (EndOfStreamException ex) @@ -230,48 +320,19 @@ private void ReceiveMessage(SubscriptionKey subscriberKey) } } - private void DispatchMessage(BasicDeliverEventArgs basicAckEventArgs) + private void DispatchMessage(BasicDeliverEventArgs basicAckEventArgs, string requestId) { var obj = _serializer.Deserialize(new MemoryStream(basicAckEventArgs.Body)); - var stateMachineRefHeader = - RabbitMqHeaderConverter.ConvertStateMachineRefHeader(basicAckEventArgs.BasicProperties.Headers); - var zipedObj = JsonConvert.DeserializeObject(obj.ToString()); - byte[] compressed = Convert.FromBase64String(zipedObj.Items); - var compressedMessage = new MemoryStream(compressed); - var decompressedMessage = new MemoryStream(); - - using (var tmpMessage = new GZipStream(compressedMessage, CompressionMode.Decompress)) - { - tmpMessage.CopyTo(decompressedMessage); - } - - var message = Encoding.UTF8.GetString(decompressedMessage.ToArray()); - - var msgEventArgs = new MessageEventArgs(stateMachineRefHeader, message, _serializationType); - - OnSnapshotReceived(msgEventArgs); - } - - private void OnSnapshotReceived(MessageEventArgs e) - { - var stateMachineInstances = new List(); - var messageReceived = - JsonConvert.DeserializeObject>(e.MessageReceived.ToString()); - foreach (var element in messageReceived) + var snapshotResponseChunk = JsonConvert.DeserializeObject(obj.ToString()); + + var chunkedSnapshotEvent = new ChunkedSnapshotEvent { - var stateMachineRefHeader = new StateMachineRefHeader() - { - StateMachineId = element.StateMachineId, - ComponentCode = element.ComponentCode, - StateMachineCode = element.StateMachineCode, - StateCode = element.StateCode - }; + RequestId = requestId, + SnapshotResponseChunk = snapshotResponseChunk + }; - var messageEventArgs = new MessageEventArgs(stateMachineRefHeader, element.PublicMember, _serializationType); - stateMachineInstances.Add(messageEventArgs); - } - SnapshotReceived?.Invoke(this, stateMachineInstances); + SnapshotReceived?.Invoke(this, chunkedSnapshotEvent); } private void UnsubscribeSnapshot(string stateMachine) diff --git a/ReactiveXComponent/ReactiveXComponent.csproj b/ReactiveXComponent/ReactiveXComponent.csproj index 301108a..cc78bbb 100644 --- a/ReactiveXComponent/ReactiveXComponent.csproj +++ b/ReactiveXComponent/ReactiveXComponent.csproj @@ -73,13 +73,19 @@ + + + - + + + + diff --git a/ReactiveXComponent/WebSocket/WebSocketClient.cs b/ReactiveXComponent/WebSocket/WebSocketClient.cs index e0db955..da0b8aa 100644 --- a/ReactiveXComponent/WebSocket/WebSocketClient.cs +++ b/ReactiveXComponent/WebSocket/WebSocketClient.cs @@ -1,4 +1,5 @@ using System; +using System.Security.Authentication; using System.Text.RegularExpressions; using System.Threading; using ReactiveXComponent.Common; @@ -31,6 +32,7 @@ private void OpenConnection() _webSocket.Security.AllowUnstrustedCertificate = true; _webSocket.Security.AllowNameMismatchCertificate = true; + _webSocket.Security.EnabledSslProtocols = SslProtocols.Tls12; _webSocket.Opened += WebSocketOnOpened; _webSocket.Closed += WebSocketOnClosed; diff --git a/ReactiveXComponent/WebSocket/WebSocketMessageHelper.cs b/ReactiveXComponent/WebSocket/WebSocketMessageHelper.cs index 3cf57d8..55aa5cc 100644 --- a/ReactiveXComponent/WebSocket/WebSocketMessageHelper.cs +++ b/ReactiveXComponent/WebSocket/WebSocketMessageHelper.cs @@ -193,25 +193,12 @@ private static string SerializeToString(object message) } } - public static string DeserializeSnapshot(string receivedMessage) + public static SnapshotResponse DeserializeSnapshot(string receivedMessage) { var snapshotHeader = JsonConvert.DeserializeObject(receivedMessage); - var zipedMessage = JsonConvert.DeserializeObject(snapshotHeader.JsonMessage); - byte[] compressedMessage = Convert.FromBase64String(zipedMessage.Items); - string message; - var compressedStream = new MemoryStream(compressedMessage); + var snapshotResponse = JsonConvert.DeserializeObject(snapshotHeader.JsonMessage); - using (var decompressedMessage = new MemoryStream()) - { - using (var unzippedMessage = new GZipStream(compressedStream, CompressionMode.Decompress)) - { - unzippedMessage.CopyTo(decompressedMessage); - } - - message = Encoding.UTF8.GetString(decompressedMessage.ToArray()); - } - - return message; + return snapshotResponse; } public static string DeserializeXCApi(string receivedMessage) diff --git a/ReactiveXComponent/WebSocket/WebSocketPublisher.cs b/ReactiveXComponent/WebSocket/WebSocketPublisher.cs index 002bd6c..0f50bdf 100644 --- a/ReactiveXComponent/WebSocket/WebSocketPublisher.cs +++ b/ReactiveXComponent/WebSocket/WebSocketPublisher.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using ReactiveXComponent.Common; using ReactiveXComponent.Configuration; using ReactiveXComponent.Connection; @@ -64,14 +65,14 @@ public void SendEvent(StateMachineRefHeader stateMachineRefHeader, object messag _webSocketClient.Send(webSocketRequest); } - public List GetSnapshot(string stateMachine, int timeout = 10000) + public List GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000) { - return _webSocketSnapshotManager.GetSnapshot(stateMachine, timeout); + return _webSocketSnapshotManager.GetSnapshot(stateMachine, chunkSize, timeout); } - public void GetSnapshotAsync(string stateMachine, Action> onSnapshotReceived) + public Task> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000) { - _webSocketSnapshotManager.GetSnapshotAsync(stateMachine, onSnapshotReceived); + return _webSocketSnapshotManager.GetSnapshotAsync(stateMachine, chunkSize, timeout); } #endregion diff --git a/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs b/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs index 3b5faf2..0a62c71 100644 --- a/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs +++ b/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs @@ -1,9 +1,11 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Reactive; using System.Reactive.Linq; using System.Threading; +using System.Threading.Tasks; using Newtonsoft.Json; using ReactiveXComponent.Common; using ReactiveXComponent.Configuration; @@ -21,9 +23,9 @@ public class WebSocketSnapshotManager : IDisposable private readonly ConcurrentDictionary _streamSubscriptionsDico; private SerializationType _serializationType; - private event EventHandler> SnapshotReceived; + private event EventHandler SnapshotReceived; - private readonly IObservable> _snapshotStream; + private readonly IObservable _snapshotStream; public WebSocketSnapshotManager(string component, IWebSocketClient webSocketClient, IXCConfiguration xcConfiguration, string privateCommunicationIdentifier) { @@ -35,22 +37,34 @@ public WebSocketSnapshotManager(string component, IWebSocketClient webSocketClie _streamSubscriptionsDico = new ConcurrentDictionary(); InitSerializationType(); - _snapshotStream = Observable.FromEvent>, List>( + _snapshotStream = Observable.FromEvent, SnapshotEvent>( handler => (sender, e) => handler(e), h => SnapshotReceived += h, h => SnapshotReceived -= h); } - public List GetSnapshot(string stateMachine, int timeout = 10000) + public List GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000) { var replyTopic = Guid.NewGuid().ToString(); List result = null; var lockEvent = new AutoResetEvent(false); - var observer = Observer.Create>(message => + var observer = Observer.Create(arg => { - result = new List(message); - lockEvent.Set(); + if (arg.RequestId == replyTopic) + { + result = new List(arg.SnapshotResponse.Items.Select(item => + new MessageEventArgs(new StateMachineRefHeader() { + StateMachineId = item.StateMachineId, + StateMachineCode = item.StateMachineCode, + ComponentCode = item.ComponentCode, + StateCode = item.StateCode + }, + item.PublicMember, + _serializationType)).ToList()); + lockEvent.Set(); + } + }); EventHandler subscriptionHandler; @@ -60,7 +74,7 @@ public List GetSnapshot(string stateMachine, int timeout = 100 using (_snapshotStream.Subscribe(observer)) { SendWebSocketSnapshotSubscriptionResquest(replyTopic); - SendWebSocketSnapshotRequest(stateMachine, replyTopic); + SendWebSocketSnapshotRequest(stateMachine, replyTopic, chunkSize); lockEvent.WaitOne(timeout); } @@ -70,26 +84,12 @@ public List GetSnapshot(string stateMachine, int timeout = 100 return result; } - public void GetSnapshotAsync(string stateMachine, Action> onSnapshotReceived) + public Task> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000) { - var replyTopic = Guid.NewGuid().ToString(); - var componentCode = _xcConfiguration.GetComponentCode(_component); - var stateMachineCode = _xcConfiguration.GetStateMachineCode(_component, stateMachine); - var subscriptionKey = new SubscriptionKey(componentCode, stateMachineCode, replyTopic); - EventHandler subscriptionHandler; - - CreateSnapshotReplyHandler(replyTopic, out subscriptionHandler); - _webSocketClient.MessageReceived += subscriptionHandler; - var snapshotSubscription = _snapshotStream.Subscribe(onSnapshotReceived); - - SendWebSocketSnapshotSubscriptionResquest(replyTopic); - SendWebSocketSnapshotRequest(stateMachine, replyTopic); - - _subscriptions.AddOrUpdate(subscriptionKey, key => subscriptionHandler, (oldKey, oldValue) => oldValue); - _streamSubscriptionsDico.AddOrUpdate(subscriptionKey, key => snapshotSubscription, (oldKey, oldValue) => oldValue); + return Task.Run(() => GetSnapshot(stateMachine, chunkSize, timeout)); } - private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic) + private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic, int? chunkSize) { if (!_webSocketClient.IsOpen) return; @@ -101,7 +101,7 @@ private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic ComponentCode = componentCode, StateMachineCode = stateMachineCode }; - var snapshotMessage = new WebSocketSnapshotMessage(stateMachineCode, componentCode, replyTopic, _privateCommunicationIdentifier); + var snapshotMessage = new WebSocketSnapshotMessage(stateMachineCode, componentCode, replyTopic, _privateCommunicationIdentifier, chunkSize); var webSocketRequest = WebSocketMessageHelper.SerializeRequest( WebSocketCommand.Snapshot, inputHeader, @@ -122,11 +122,10 @@ private void CreateSnapshotReplyHandler(string replyTopic, out EventHandler(); - var snapshotReceived = JsonConvert.DeserializeObject>(receivedPacket); - foreach (var element in snapshotReceived) + foreach (var element in snapshotResponse.Items) { var stateMachineRefHeader = new StateMachineRefHeader() { @@ -140,7 +139,13 @@ private void CreateSnapshotReplyHandler(string replyTopic, out EventHandler x.ExchangeDeclare(null, null, true, true, null)).DoNotCallBase(); var queueDeclareOk = new QueueDeclareOk(QueueName, uint.MaxValue, uint.MaxValue); channel.QueueDeclare().ReturnsForAnyArgs(queueDeclareOk); - + var consumeAction = new Action((queueName, noAck, aconsumer) => { consumer = (EventingBasicConsumer)aconsumer; @@ -395,46 +403,50 @@ public void SnapshotTest() using (var snapshotReceivedEvent = new AutoResetEvent(false)) { List snapshotInstances = null; - - var snapshotHandler = new Action>(instances => + Task.Run(async () => + { + snapshotInstances = await publisher.GetSnapshotAsync(StateMachineA, chunkSize); + }).GetAwaiter().OnCompleted(() => { - snapshotInstances = instances; snapshotReceivedEvent.Set(); }); - publisher.GetSnapshotAsync(StateMachineA, snapshotHandler); - - var stateMachineRef = new StateMachineRefHeader() { + var stateMachineRef = new StateMachineRefHeader + { ComponentCode = componentCode, StateMachineCode = stateMachineCode, PrivateTopic = PublicRoutingKey }; + + var instances = new List(); - var stateMachineInstances = new List() + for (var i = 0; i < instancesCount; i++) { - new StateMachineInstance() - { + instances.Add(new StateMachineInstance() { ComponentCode = componentCode, StateMachineCode = stateMachineCode, PublicMember = TestMessage - } - }; - - var message = SerializeSnapshotInstances(stateMachineInstances); + }); + } - var basicProperties = new BasicProperties() - { + var chunks = SerializeSnapshotInstancesInChunks(instances, chunkSize); + var basicProperties = new BasicProperties { Headers = RabbitMqHeaderConverter.CreateHeaderFromStateMachineRefHeader(stateMachineRef, IncomingEventType.Snapshot, eventCode) }; - - consumer.HandleBasicDeliver(consumer.ConsumerTag, ulong.MaxValue, false, ExchangeName, snapshotReplyTopic, basicProperties, message); + + foreach (var chunk in chunks) + { + Thread.Sleep(100); + consumer.HandleBasicDeliver(consumer.ConsumerTag, ulong.MaxValue, false, ExchangeName, snapshotReplyTopic, basicProperties, chunk); + } var messageReceived = snapshotReceivedEvent.WaitOne(receptionTimeout); Check.That(messageReceived).IsTrue(); - Check.That(snapshotInstances.Count).IsEqualTo(1); + Check.That(snapshotInstances.Count).IsEqualTo(instancesCount); Check.That(snapshotInstances.FirstOrDefault()?.MessageReceived).IsInstanceOf(); Check.That((string)(snapshotInstances.FirstOrDefault()?.MessageReceived)).IsEqualTo(TestMessage); + snapshotReceivedEvent.Dispose(); } } @@ -453,39 +465,66 @@ private ISerializer GetSerializer(string serializer) return new BinarySerializer(); } } - private byte[] SerializeSnapshotInstances(List instances) + + private List SerializeSnapshotInstancesInChunks(List instances, int? chunkSize = 1) { - var serializedInstances = JsonConvert.SerializeObject(instances); - var stream = new MemoryStream(); - var streamWriter = new StreamWriter(stream); - streamWriter.Write(serializedInstances); - streamWriter.Flush(); - stream.Seek(0, SeekOrigin.Begin); - byte[] compressedBytes = null; - - using (var compressed = new MemoryStream()) + + var chunks = new List(); + var size = 0; + if (chunkSize.HasValue && chunkSize.Value <= 0) { - using (var compressor = new GZipStream(compressed, CompressionMode.Compress)) - { - stream.CopyTo(compressor); - } + size = instances.Count; + } + else + { + size = chunkSize.Value; + } - compressedBytes = compressed.ToArray(); + var chunksCount = 1; + if (instances.Count > size) + { + chunksCount = instances.Count / size + 1; } - var base64String = Convert.ToBase64String(compressedBytes, Base64FormattingOptions.None); - var snapshotItems = new SnapshotItems() + for (var i = 0; i < chunksCount; i++) { - Items = base64String - }; + var chunkInstances = new List(); + + for (var j = i* size; j < i* size + size && j < instances.Count; j++) + { + chunkInstances.Add(new SnapshotItem() + { + ComponentCode = instances[j].ComponentCode, + StateMachineCode = instances[j].StateMachineCode, + StateMachineId = instances[j].StateMachineId, + StateCode = instances[j].StateCode, + PublicMember = instances[j].PublicMember + }); + + } - var serializedMessage = JsonConvert.SerializeObject(snapshotItems); - var messageStream = new MemoryStream(); - GetSerializer(SnapshotSerialization).Serialize(messageStream, serializedMessage); - messageStream.Flush(); - messageStream.Seek(0, SeekOrigin.Begin); + var snapshotResponseChunk = new SnapshotResponseChunk + { + ChunkCount = chunksCount, + ChunkId = i, + KnownRuntimeIds = new List { "localhost" }, + Response = new SnapshotResponse + { + Items = chunkInstances + }, + RuntimeId = "localhost" + }; + + var serializedMessage = JsonConvert.SerializeObject(snapshotResponseChunk); + var messageStream = new MemoryStream(); + GetSerializer(SnapshotSerialization).Serialize(messageStream, serializedMessage); + messageStream.Flush(); + messageStream.Seek(0, SeekOrigin.Begin); + + chunks.Add(messageStream.ToArray()); + } - return messageStream.ToArray(); + return chunks; } } } diff --git a/ReactiveXComponentTest/WebSocket/WebSocketTests.cs b/ReactiveXComponentTest/WebSocket/WebSocketTests.cs index 4b69f8a..e588c4d 100644 --- a/ReactiveXComponentTest/WebSocket/WebSocketTests.cs +++ b/ReactiveXComponentTest/WebSocket/WebSocketTests.cs @@ -4,6 +4,7 @@ using System.Reactive; using System.Text; using System.Threading; +using System.Threading.Tasks; using Newtonsoft.Json; using NFluent; using NSubstitute; @@ -301,22 +302,18 @@ public void SnapshotTest() using (var webSocketPublisher = new WebSocketPublisher(componentName, webSocketClient, xcConfiguration, privateTopic)) using (var snapshotReceivedEvent = new AutoResetEvent(false)) { - var snapshotHandler = new Action>(args => + List instances = new List(); + Task.Run(async () => { - if (args.All(instance => instance.StateMachineRefHeader.ComponentCode == componentCode - && instance.StateMachineRefHeader.StateMachineCode == stateMachineCode)) - { - snapshotReceivedEvent.Set(); - } + instances = await webSocketPublisher.GetSnapshotAsync(stateMachineName, 2); + }).GetAwaiter().OnCompleted(() => + { + snapshotReceivedEvent.Set(); }); - webSocketPublisher.GetSnapshotAsync(stateMachineName, snapshotHandler); - var snapshotReceived = snapshotReceivedEvent.WaitOne(_timeout); Check.That(snapshotReceived).IsTrue(); - - var snapshot = webSocketPublisher.GetSnapshot(stateMachineName); - Check.That(snapshot.Count == 1); + Check.That(instances.Count == 1); } } From 202599d620d38c39cf7c7180fe6a8c321b946e98 Mon Sep 17 00:00:00 2001 From: "anouar.hassine" Date: Mon, 30 Jul 2018 17:01:00 +0200 Subject: [PATCH 3/4] Sanpshot - Adding a default value for chunkSize parameter --- ReactiveXComponent/Connection/IXCPublisher.cs | 4 ++-- ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs | 4 ++-- ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ReactiveXComponent/Connection/IXCPublisher.cs b/ReactiveXComponent/Connection/IXCPublisher.cs index f38e37b..15e486b 100644 --- a/ReactiveXComponent/Connection/IXCPublisher.cs +++ b/ReactiveXComponent/Connection/IXCPublisher.cs @@ -10,7 +10,7 @@ public interface IXCPublisher : IDisposable void SendEvent(string stateMachine, object message, string messageType, Visibility visibility = Visibility.Public); void SendEvent(string stateMachine, object message, Visibility visibility = Visibility.Public); void SendEvent(StateMachineRefHeader stateMachineRefHeader, object message, Visibility visibility = Visibility.Public); - List GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000); - Task> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000); + List GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000); + Task> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000); } } \ No newline at end of file diff --git a/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs b/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs index 417e44f..6ff2956 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs @@ -73,7 +73,7 @@ private static SnapshotResponse AggregateChunks(ConcurrentBag GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000) + public List GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000) { var guid = Guid.NewGuid(); var requestId = guid.ToString(); @@ -182,7 +182,7 @@ public List GetSnapshot(string stateMachine, int? chunkSize, i return result; } - public Task> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000) + public Task> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000) { return Task.Run(() => GetSnapshot(stateMachine, chunkSize, timeout)); } diff --git a/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs b/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs index 0a62c71..21432ff 100644 --- a/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs +++ b/ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs @@ -43,7 +43,7 @@ public WebSocketSnapshotManager(string component, IWebSocketClient webSocketClie h => SnapshotReceived -= h); } - public List GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000) + public List GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000) { var replyTopic = Guid.NewGuid().ToString(); @@ -84,7 +84,7 @@ public List GetSnapshot(string stateMachine, int? chunkSize, i return result; } - public Task> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000) + public Task> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000) { return Task.Run(() => GetSnapshot(stateMachine, chunkSize, timeout)); } From a9f6718d962ae0fae21b370f5259702f65f296d6 Mon Sep 17 00:00:00 2001 From: "anouar.hassine" Date: Mon, 30 Jul 2018 17:05:02 +0200 Subject: [PATCH 4/4] Removinig unnecessary using statements and namespaces --- ReactiveXComponent/Common/ChunkCountdown.cs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/ReactiveXComponent/Common/ChunkCountdown.cs b/ReactiveXComponent/Common/ChunkCountdown.cs index f195f2a..dd6978c 100644 --- a/ReactiveXComponent/Common/ChunkCountdown.cs +++ b/ReactiveXComponent/Common/ChunkCountdown.cs @@ -1,15 +1,11 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading; -using System.Threading.Tasks; namespace ReactiveXComponent.Common { public class ChunkCountdown : IDisposable { - private readonly ManualResetEvent _completionResetEvent = new System.Threading.ManualResetEvent(false); + private readonly ManualResetEvent _completionResetEvent = new ManualResetEvent(false); private const int ValueNotSet = int.MinValue; private int _countdown = ValueNotSet;