Skip to content

Commit

Permalink
Merge pull request #65 from kuzzleio/2.0.5-proposal
Browse files Browse the repository at this point in the history
# [2.0.5](https://github.com/kuzzleio/sdk-csharp/releases/tag/2.0.5) (2020-09-29)


#### Bug fixes

- [ [#64](#64) ] Fix network recovery   ([scottinet](https://github.com/scottinet))
---
  • Loading branch information
scottinet committed Sep 29, 2020
2 parents b253d47 + c1a4ee7 commit 37835cd
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 75 deletions.
24 changes: 12 additions & 12 deletions Kuzzle.Tests/Offline/Query/QueryReplayerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ public class QueryReplayerTest {
testableOfflineManager.MaxQueueSize = -1;
queryReplayer.Lock = false;

Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foo', action: 'bar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'bar', action: 'foor'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '1', controller: 'foo', action: 'bar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '2', controller: 'bar', action: 'foor'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '3', controller: 'foobar', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '4', controller: 'barfoo', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '5', controller: 'foobar', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '6', controller: 'barfoo', action: 'barfoo'}")));

Assert.Equal(6, queryReplayer.Count);

Expand All @@ -168,12 +168,12 @@ public class QueryReplayerTest {
testableOfflineManager.MaxQueueSize = -1;
queryReplayer.Lock = false;

Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foo', action: 'bar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'bar', action: 'foor'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '1', controller: 'foo', action: 'bar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '2', controller: 'bar', action: 'foor'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '3', controller: 'foobar', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '4', controller: 'barfoo', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '5', controller: 'foobar', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '6', controller: 'barfoo', action: 'barfoo'}")));

Assert.Equal(6, queryReplayer.Count);

Expand Down
67 changes: 43 additions & 24 deletions Kuzzle/Kuzzle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ internal interface IKuzzle {
public sealed class Kuzzle : IKuzzleApi, IKuzzle {
private AbstractProtocol networkProtocol;

private SemaphoreSlim requestsSemaphore = new SemaphoreSlim(1, 1);

internal readonly Dictionary<string, TaskCompletionSource<Response>>
requests = new Dictionary<string, TaskCompletionSource<Response>>();

Expand Down Expand Up @@ -220,7 +222,6 @@ public sealed class Kuzzle : IKuzzleApi, IKuzzle {
}
}


/// <summary>
/// Handles the ResponseEvent event from the network protocol
/// </summary>
Expand All @@ -229,39 +230,55 @@ public sealed class Kuzzle : IKuzzleApi, IKuzzle {
internal void ResponsesListener(object sender, string payload) {
Response response = Response.FromString(payload);

if (requests.ContainsKey(response.Room)) {
if (response.Error != null) {
if (response.Error.Message == "Token expired") {
EventHandler.DispatchTokenExpired();
}
if (!requests.ContainsKey(response.Room)) {
EventHandler.DispatchUnhandledResponse(response);
return;
}

requests[response.RequestId].SetException(
new Exceptions.ApiErrorException(response));
} else {
requests[response.RequestId].SetResult(response);
}
TaskCompletionSource<Response> task = requests[response.RequestId];

lock (requests) {
requests.Remove(response.RequestId);
if (response.Error != null) {
if (response.Error.Message == "Token expired") {
EventHandler.DispatchTokenExpired();
}

Offline?.QueryReplayer?.Remove((obj) => obj["requestId"].ToString() == response.RequestId);
task.SetException(new Exceptions.ApiErrorException(response));
}
else {
task.SetResult(response);
}

} else {
EventHandler.DispatchUnhandledResponse(response);
requestsSemaphore.Wait();
try {
requests.Remove(response.RequestId);
}
finally {
requestsSemaphore.Release();
}

Offline?.QueryReplayer?.Remove(
(obj) => obj["requestId"].ToString() == response.RequestId);
}

internal void StateChangeListener(object sender, ProtocolState state) {
// If not connected anymore: close tasks and clean up the requests buffer
if (state == ProtocolState.Closed) {
lock (requests) {
// If not connected anymore: close pending tasks and clean up the requests
// buffer.
// If reconnecting, only requests submitted AFTER the disconnection event
// can be queued: we have no information about requests submitted before
// that event. For all we know, Kuzzle could have received & processed
// those requests, but couldn't forward the response to us
if (state == ProtocolState.Closed || state == ProtocolState.Reconnecting) {
requestsSemaphore.Wait();
try {
foreach (var task in requests.Values) {
task.SetException(new Exceptions.ConnectionLostException());
}

requests.Clear();
}
finally {
requestsSemaphore.Release();
}
}
}

Expand Down Expand Up @@ -382,14 +399,16 @@ public sealed class Kuzzle : IKuzzleApi, IKuzzle {
query["volatile"]["sdkName"] = SdkName;
query["volatile"]["sdkInstanceId"] = InstanceId;

requestsSemaphore.Wait();
requests[requestId] = new TaskCompletionSource<Response>(
TaskCreationOptions.RunContinuationsAsynchronously);
requestsSemaphore.Release();

if (NetworkProtocol.State == ProtocolState.Open) {
NetworkProtocol.Send(query);
} else if (NetworkProtocol.State == ProtocolState.Reconnecting) {
Offline.QueryReplayer.Enqueue(query);
}

lock (requests) {
requests[requestId] = new TaskCompletionSource<Response>();
else if (NetworkProtocol.State == ProtocolState.Reconnecting) {
Offline.QueryReplayer.Enqueue(query);
}

return requests[requestId].Task.ConfigureAwait(false);
Expand Down
4 changes: 4 additions & 0 deletions Kuzzle/Kuzzle.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
<MonoDevelop>
<Properties>
<Deployment.LinuxDeployData generatePcFile="False" />
<Policies>
<TextStylePolicy RemoveTrailingWhitespace="True" NoTabsAfterNonTabs="False" EolMarker="Native" FileWidth="80" TabWidth="2" TabsToSpaces="True" IndentWidth="2" scope="text/x-csharp" />
<CSharpFormattingPolicy IndentBlock="True" IndentBraces="False" IndentSwitchSection="True" IndentSwitchCaseSection="True" LabelPositioning="OneLess" NewLineForElse="True" NewLineForCatch="True" NewLineForFinally="True" NewLineForMembersInObjectInit="True" NewLineForMembersInAnonymousTypes="True" NewLineForClausesInQuery="True" SpaceWithinMethodDeclarationParenthesis="False" SpaceBetweenEmptyMethodDeclarationParentheses="False" SpaceAfterMethodCallName="False" SpaceWithinMethodCallParentheses="False" SpaceBetweenEmptyMethodCallParentheses="False" SpaceAfterControlFlowStatementKeyword="True" SpaceWithinExpressionParentheses="False" SpaceWithinCastParentheses="False" SpaceWithinOtherParentheses="False" SpaceAfterCast="False" SpacesIgnoreAroundVariableDeclaration="False" SpaceBeforeOpenSquareBracket="False" SpaceBetweenEmptySquareBrackets="False" SpaceWithinSquareBrackets="False" SpaceAfterColonInBaseTypeDeclaration="True" SpaceAfterComma="True" SpaceAfterDot="False" SpaceAfterSemicolonsInForStatement="True" SpaceBeforeColonInBaseTypeDeclaration="True" SpaceBeforeComma="False" SpaceBeforeDot="False" SpaceBeforeSemicolonsInForStatement="False" SpacingAroundBinaryOperator="Single" WrappingPreserveSingleLine="True" WrappingKeepStatementsOnSingleLine="True" PlaceSystemDirectiveFirst="True" NewLinesForBracesInTypes="False" NewLinesForBracesInMethods="False" NewLinesForBracesInProperties="False" NewLinesForBracesInAccessors="False" NewLinesForBracesInAnonymousMethods="False" NewLinesForBracesInControlBlocks="False" NewLinesForBracesInAnonymousTypes="False" NewLinesForBracesInObjectCollectionArrayInitializers="False" NewLinesForBracesInLambdaExpressionBody="False" SpacingAfterMethodDeclarationName="True" scope="text/x-csharp" />
</Policies>
</Properties>
</MonoDevelop>
</ProjectExtensions>
Expand Down
2 changes: 1 addition & 1 deletion Kuzzle/Kuzzle.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<metadata>
<id>kuzzlesdk</id>
<title>Kuzzle SDK</title>
<version>2.0.4</version>
<version>2.0.5</version>
<authors>Kuzzle Team</authors>
<owners>Kuzzle Team</owners>
<license type="expression">Apache-2.0</license>
Expand Down
2 changes: 1 addition & 1 deletion Kuzzle/Offline/OfflineManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ public class OfflineManager : IOfflineManager {

internal void StateChangeListener(object sender, ProtocolState state) {
if (state == ProtocolState.Open && previousState == ProtocolState.Reconnecting) {

kuzzle.GetEventHandler().DispatchReconnected();

Task.Run(async () => {
Expand All @@ -235,6 +234,7 @@ public class OfflineManager : IOfflineManager {
});

}

previousState = state;
}

Expand Down
73 changes: 54 additions & 19 deletions Kuzzle/Offline/Query/QueryReplayer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using KuzzleSdk.API;
using KuzzleSdk.API.Offline;
using Newtonsoft.Json.Linq;

Expand Down Expand Up @@ -73,6 +74,7 @@ internal sealed class QueryReplayer : IQueryReplayer {
private CancellationTokenSource cancellationTokenSource;
private bool currentlyReplaying = false;
private Stopwatch stopWatch = new Stopwatch();
private SemaphoreSlim queueSemaphore = new SemaphoreSlim(1, 1);

/// <summary>
/// Tells if the QueryReplayer is locked (i.e. it doesn't accept new queries).
Expand Down Expand Up @@ -103,28 +105,35 @@ internal sealed class QueryReplayer : IQueryReplayer {
public bool Enqueue(JObject query) {
if (Lock || WaitLoginToReplay) return false;

lock (queue) {
queueSemaphore.Wait();
try {
if (queue.Count < offlineManager.MaxQueueSize || offlineManager.MaxQueueSize < 0) {
if (queue.Count == 0) {
stopWatch.Reset();
stopWatch.Start();
queue.Add(new TimedQuery(query, 0));
} else {
}
else {
TimedQuery previous = queue[queue.Count - 1];
Int64 elapsedTime = stopWatch.ElapsedMilliseconds - previous.Time;
elapsedTime = Math.Min(elapsedTime, offlineManager.MaxRequestDelay);
queue.Add(new TimedQuery(query, previous.Time + elapsedTime));
}
if (query["controller"]?.ToString() == "auth"
&& (query["action"]?.ToString() == "login"
|| query["action"]?.ToString() == "logout")
) {
Lock = true;
}

String controller = query["controller"]?.ToString();
String action = query["action"]?.ToString();

if (controller == "auth" && (action == "login" || action == "logout")) {
Lock = true;
}

return true;
}
}
finally {
queueSemaphore.Release();
}

return false;
}

Expand All @@ -139,7 +148,8 @@ internal sealed class QueryReplayer : IQueryReplayer {
/// Remove and return the first query that has been added to the queue.
/// </summary>
public JObject Dequeue() {
lock (queue) {
queueSemaphore.Wait();
try {
if (queue.Count == 0) {
return null;
}
Expand All @@ -149,6 +159,9 @@ internal sealed class QueryReplayer : IQueryReplayer {

return query;
}
finally {
queueSemaphore.Release();
}
}

/// <summary>
Expand All @@ -163,27 +176,42 @@ internal sealed class QueryReplayer : IQueryReplayer {
/// it is set with an exception and removed from the replayable queue.
/// </summary>
public void RejectQueries(Predicate<JObject> predicate, Exception exception) {
lock (queue) {
queueSemaphore.Wait();
try {
foreach (TimedQuery timedQuery in queue) {
if (predicate(timedQuery.Query)) {
kuzzle.GetRequestById(timedQuery.Query["requestId"]?.ToString())?.SetException(exception);
String requestId = timedQuery.Query["requestId"]?.ToString();

if (requestId != null) {
TaskCompletionSource<Response> task = kuzzle.GetRequestById(requestId);

if (task != null) {
task.SetException(exception);
}
}
}
}

queue.RemoveAll((obj) => predicate(obj.Query));

if (queue.Count == 0) {
Lock = false;
currentlyReplaying = false;
WaitLoginToReplay = false;
}
}
finally {
queueSemaphore.Release();
}
}

/// <summary>
/// Remove every query that satisfies the predicate
/// </summary>
/// <returns>How many items where removed.</returns>
public int Remove(Predicate<JObject> predicate) {
lock (queue) {
queueSemaphore.Wait();
try {
if (queue.Count > 0) {
Predicate<TimedQuery> timedQueryPredicate = timedQuery => predicate(timedQuery.Query);
int itemsRemoved = queue.RemoveAll(timedQueryPredicate);
Expand All @@ -196,19 +224,22 @@ internal sealed class QueryReplayer : IQueryReplayer {
return itemsRemoved;
}
}
finally {
queueSemaphore.Release();
}
return 0;
}

/// <summary>
/// Clear the queue.
/// </summary>
public void Clear() {
lock (queue) {
queue.Clear();
Lock = false;
currentlyReplaying = false;
WaitLoginToReplay = false;
}
queueSemaphore.Wait();
queue.Clear();
Lock = false;
currentlyReplaying = false;
WaitLoginToReplay = false;
queueSemaphore.Release();
}

internal delegate Task ReplayQueryFunc(TimedQuery timedQuery, CancellationToken cancellationToken);
Expand Down Expand Up @@ -255,7 +286,8 @@ internal sealed class QueryReplayer : IQueryReplayer {

if (resetWaitLogin) WaitLoginToReplay = false;

lock (queue) {
queueSemaphore.Wait();
try {
if (queue.Count > 0) {
currentlyReplaying = true;

Expand All @@ -267,6 +299,9 @@ internal sealed class QueryReplayer : IQueryReplayer {

}
}
finally {
queueSemaphore.Release();
}
return cancellationTokenSource;
}

Expand Down
Loading

0 comments on commit 37835cd

Please sign in to comment.