Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions ReactiveXComponent/Common/ChunkCountdown.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using System.Threading;

namespace ReactiveXComponent.Common
{
public class ChunkCountdown : IDisposable
{
private readonly ManualResetEvent _completionResetEvent = new ManualResetEvent(false);

private const int ValueNotSet = int.MinValue;
private int _countdown = ValueNotSet;

public void SetValueIfNotInitialized(int countdown)
{
Interlocked.CompareExchange(ref _countdown, countdown, ValueNotSet);
}

public void Decrement()
{
bool isCompleted = Interlocked.Decrement(ref _countdown) <= 0;
if (isCompleted)
{
_completionResetEvent.Set();
}
}

public WaitHandle CompletionResetEvent => _completionResetEvent;

public int Countdown => _countdown;

public void Dispose()
{
_completionResetEvent?.Dispose();
}
}
}
11 changes: 11 additions & 0 deletions ReactiveXComponent/Common/ChunkedSnapshotEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;

namespace ReactiveXComponent.Common
{
public class ChunkedSnapshotEvent
{
public string RequestId { get; set; }

public SnapshotResponseChunk SnapshotResponseChunk { get; set; }
}
}
62 changes: 62 additions & 0 deletions ReactiveXComponent/Common/GZipSnapshotItemArrayJsonConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;

namespace ReactiveXComponent.Common
{
public class GZipSnapshotItemArrayJsonConverter<T> : JsonConverter
{
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
var stringWriter = new StringWriter(new StringBuilder(256), CultureInfo.InvariantCulture);
using (var jsonTextWriter = new JsonTextWriter(stringWriter))
{
jsonTextWriter.Formatting = serializer.Formatting;
serializer.Serialize(jsonTextWriter, value);
var input = Encoding.UTF8.GetBytes(stringWriter.ToString());
using (var ms = new MemoryStream())
{
using (var compressedStream = new GZipStream(ms, CompressionMode.Compress))
{
compressedStream.Write(input, 0, input.Length);
compressedStream.Close();
}
var compressedBase64 = Convert.ToBase64String(ms.ToArray());
serializer.Serialize(writer, compressedBase64, typeof(string));
}
}
}

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
var input = reader.Value as string;
if (input != null)
{
var compressedInput = Convert.FromBase64String(input);
using (var ms = new MemoryStream(compressedInput))
{
using (var decompressedStream = new GZipStream(ms, CompressionMode.Decompress))
{
using (var streamReader = new StreamReader(decompressedStream))
{
return serializer.Deserialize(streamReader, typeof(T));
}
}
}
}

return null;
}

public override bool CanConvert(Type objectType)
{
return typeof(T) == objectType;
}
}
}
12 changes: 12 additions & 0 deletions ReactiveXComponent/Common/SnapshotEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;

namespace ReactiveXComponent.Common
{
public class SnapshotEvent
{
public string RequestId { get; set; }

public SnapshotResponse SnapshotResponse { get; set; }
}
}
36 changes: 36 additions & 0 deletions ReactiveXComponent/Common/SnapshotItem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ReactiveXComponent.Common
{
public class SnapshotItem
{
public const int DefaultIntValue = -1;
public const string DefaultStringValue = "";

public string StateMachineId { get; set; }

public int WorkerId { get; set; }

public int StateMachineCode { get; set; }

public int StateCode { get; set; }

public int ComponentCode { get; set; }

public object PublicMember { get; set; }

public SnapshotItem()
{
StateMachineId = DefaultStringValue;
WorkerId = DefaultIntValue;
StateMachineCode = DefaultIntValue;
StateCode = DefaultIntValue;
ComponentCode = DefaultIntValue;
PublicMember = null;
}
}
}
11 changes: 0 additions & 11 deletions ReactiveXComponent/Common/SnapshotItems.cs

This file was deleted.

1 change: 1 addition & 0 deletions ReactiveXComponent/Common/SnapshotMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ public class SnapshotMessage
public TimeSpan? Timeout { get; set; } = DefaultTimeout;
public List<string> CallerPrivateTopic { get; set; }
public string ReplyTopic { get; set; }
public int? ChunkSize { get; set; }
}
}
19 changes: 19 additions & 0 deletions ReactiveXComponent/Common/SnapshotResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

using System;
using System.Collections.Generic;
using Newtonsoft.Json;

namespace ReactiveXComponent.Common
{
[Serializable]
public class SnapshotResponse
{
[JsonConverter(typeof(GZipSnapshotItemArrayJsonConverter<List<SnapshotItem>>))]
public List<SnapshotItem> Items { get; set; }

public SnapshotResponse()
{
Items = new List<SnapshotItem>();
}
}
}
28 changes: 28 additions & 0 deletions ReactiveXComponent/Common/SnapshotResponseChunk.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;

namespace ReactiveXComponent.Common
{
public class SnapshotResponseChunk
{
public SnapshotResponseChunk()
{
}

public SnapshotResponseChunk(string runtimeId, List<string> knownRuntimeIds)
{
RuntimeId = runtimeId;
KnownRuntimeIds = knownRuntimeIds;
}

public int ChunkCount { get; set; }

public int ChunkId { get; set; }

public string RuntimeId { get; set; }

public List<string> KnownRuntimeIds { get; set; }

public SnapshotResponse Response { get; set; }
}
}
2 changes: 1 addition & 1 deletion ReactiveXComponent/Common/StateMachineInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace ReactiveXComponent.Common
[Serializable]
public class StateMachineInstance
{
public long StateMachineId { get; set; }
public string StateMachineId { get; set; }

public int AgentId { get; set; }

Expand Down
4 changes: 2 additions & 2 deletions ReactiveXComponent/Common/StateMachineRefHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public override int GetHashCode()
{
unchecked
{
var hashCode = StateMachineId.GetHashCode();
var hashCode = (StateMachineId != null ? StateMachineId.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ StateCode;
hashCode = (hashCode * 397) ^ StateMachineCode;
hashCode = (hashCode * 397) ^ ComponentCode;
Expand All @@ -32,7 +32,7 @@ public override int GetHashCode()
}
}

public long StateMachineId { get; set; }
public string StateMachineId { get; set; }

public int StateCode { get; set; }

Expand Down
2 changes: 1 addition & 1 deletion ReactiveXComponent/Common/WebSocketEngineHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace ReactiveXComponent.Common
/// </summary>
public class WebSocketEngineHeader
{
public long? StateMachineId { get; set; }
public string StateMachineId { get; set; }

public int? StateMachineCode { get; set; }

Expand Down
6 changes: 3 additions & 3 deletions ReactiveXComponent/Connection/IXCPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using ReactiveXComponent.Common;
using ReactiveXComponent.RabbitMq;

namespace ReactiveXComponent.Connection
{
Expand All @@ -10,7 +10,7 @@ public interface IXCPublisher : IDisposable
void SendEvent(string stateMachine, object message, string messageType, Visibility visibility = Visibility.Public);
void SendEvent(string stateMachine, object message, Visibility visibility = Visibility.Public);
void SendEvent(StateMachineRefHeader stateMachineRefHeader, object message, Visibility visibility = Visibility.Public);
List<MessageEventArgs> GetSnapshot(string stateMachine, int timeout = 10000);
void GetSnapshotAsync(string stateMachine, Action<List<MessageEventArgs>> onSnapshotReceived);
List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000);
Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000);
}
}
8 changes: 4 additions & 4 deletions ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static Dictionary<string, object> CreateHeaderFromStateMachineRefHeader(S

var dico = new Dictionary<string, object>
{
{HeaderElement.StateMachineId, stateMachineRefHeader?.StateMachineId},
{HeaderElement.StateMachineId, stateMachineRefHeader?.StateMachineId != null ? Encoding.UTF8.GetBytes(stateMachineRefHeader.StateMachineId) : encoding.GetBytes(string.Empty)},
{HeaderElement.StateCode, stateMachineRefHeader?.StateCode},
{HeaderElement.StateMachineCode, stateMachineRefHeader?.StateMachineCode},
{HeaderElement.ComponentCode, stateMachineRefHeader?.ComponentCode},
Expand All @@ -48,7 +48,7 @@ public static Dictionary<string, object> CreateHeaderFromStateMachineRefHeader(S
public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<string,object> stateMachineRefHeader)
{
var encoding = new UTF8Encoding();
var stateMachineId = -1;
string stateMachineId = null;
var stateCode = -1;
var stateMachineCode = -1;
var componentCode = -1;
Expand All @@ -57,8 +57,8 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<str
var sessionData = string.Empty;
var errorMessage = string.Empty;

if (stateMachineRefHeader.ContainsKey(HeaderElement.StateMachineId))
stateMachineId = Convert.ToInt32(stateMachineRefHeader[HeaderElement.StateMachineId]);
if (stateMachineRefHeader.ContainsKey(HeaderElement.StateMachineId) && stateMachineRefHeader[HeaderElement.StateMachineId] != null)
stateMachineId = Encoding.UTF8.GetString((byte[])stateMachineRefHeader[HeaderElement.StateMachineId]);
if (stateMachineRefHeader.ContainsKey(HeaderElement.StateCode))
stateCode = Convert.ToInt32(stateMachineRefHeader[HeaderElement.StateCode]);
if (stateMachineRefHeader.ContainsKey(HeaderElement.StateMachineCode))
Expand Down
9 changes: 5 additions & 4 deletions ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using RabbitMQ.Client;
using ReactiveXComponent.Common;
using ReactiveXComponent.Configuration;
Expand Down Expand Up @@ -69,14 +70,14 @@ public void SendEvent(StateMachineRefHeader stateMachineRefHeader, object messag
Send(message, routingKey, prop);
}

public List<MessageEventArgs> GetSnapshot(string stateMachine, int timeout = 10000)
public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000)
{
return _rabbitMqSnapshotManager.GetSnapshot(stateMachine, timeout);
return _rabbitMqSnapshotManager.GetSnapshot(stateMachine, chunkSize, timeout);
}

public void GetSnapshotAsync(string stateMachine, Action<List<MessageEventArgs>> onSnapshotReceived)
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000)
{
_rabbitMqSnapshotManager.GetSnapshotAsync(stateMachine, onSnapshotReceived);
return _rabbitMqSnapshotManager.GetSnapshotAsync(stateMachine, chunkSize, timeout);
}

#endregion
Expand Down
Loading