Skip to content

Commit

Permalink
[C#] Guard against info being reset by global state machine (during c…
Browse files Browse the repository at this point in the history
…heckpointing) (#311)

* Fix to checkpoint tokens race
* stricter condition
* Guard info being reset by global machine.
  • Loading branch information
badrishc committed Sep 3, 2020
1 parent a0880d7 commit 0617852
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ internal void UnsafeSuspendThread()

void IClientSession.AtomicSwitch(int version)
{
fht.AtomicSwitch(ctx, ctx.prevCtx, version);
fht.AtomicSwitch(ctx, ctx.prevCtx, version, fht._hybridLogCheckpoint.info.checkpointTokens);
}

/// <summary>
Expand Down
5 changes: 3 additions & 2 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma warning disable 0162

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
Expand Down Expand Up @@ -268,7 +269,7 @@ private static async ValueTask<ReadAsyncResult<Input, Output, Context, Functions
return new ReadAsyncResult<Input, Output, Context, Functions>(@this, clientSession, pendingContext, diskRequest);
}

internal bool AtomicSwitch<Input, Output, Context>(FasterExecutionContext<Input, Output, Context> fromCtx, FasterExecutionContext<Input, Output, Context> toCtx, int version)
internal bool AtomicSwitch<Input, Output, Context>(FasterExecutionContext<Input, Output, Context> fromCtx, FasterExecutionContext<Input, Output, Context> toCtx, int version, ConcurrentDictionary<string, CommitPoint> tokens)
{
lock (toCtx)
{
Expand All @@ -277,7 +278,7 @@ internal bool AtomicSwitch<Input, Output, Context>(FasterExecutionContext<Input,
CopyContext(fromCtx, toCtx);
if (toCtx.serialNum != -1)
{
_hybridLogCheckpoint.info.checkpointTokens.TryAdd(toCtx.guid,
tokens.TryAdd(toCtx.guid,
new CommitPoint
{
UntilSerialNo = toCtx.serialNum,
Expand Down
5 changes: 2 additions & 3 deletions cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,10 @@ public override void OnThreadState<Key, Value, Input, Output, Context, FasterSes
var s = faster._hybridLogCheckpoint.flushedSemaphore;

var notify = faster.hlog.FlushedUntilAddress >= faster._hybridLogCheckpoint.info.finalLogicalAddress;
notify = notify || !faster.SameCycle(current);
notify = notify || !faster.SameCycle(current) || s == null;

if (valueTasks != null && !notify)
{
Debug.Assert(s != null);
valueTasks.Add(new ValueTask(s.WaitAsync(token).ContinueWith(t => s.Release())));
}

Expand Down Expand Up @@ -233,7 +232,7 @@ public override void OnThreadState<Key, Value, Input, Output, Context, FasterSes
var s = faster._hybridLogCheckpoint.flushedSemaphore;

var notify = s != null && s.CurrentCount > 0;
notify = notify || !faster.SameCycle(current);
notify = notify || !faster.SameCycle(current) || s == null;

if (valueTasks != null && !notify)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@ public void OnThreadState<Key, Value, Input, Output, Context, FasterSession>(
{
// Need to be very careful here as threadCtx is changing
var _ctx = prev.phase == Phase.IN_PROGRESS ? ctx.prevCtx : ctx;
var tokens = faster._hybridLogCheckpoint.info.checkpointTokens;
if (!faster.SameCycle(current) || tokens == null)
return;

if (!_ctx.markers[EpochPhaseIdx.InProgress])
{
faster.AtomicSwitch(ctx, ctx.prevCtx, _ctx.version);
faster.AtomicSwitch(ctx, ctx.prevCtx, _ctx.version, tokens);
faster.InitContext(ctx, ctx.prevCtx.guid, ctx.prevCtx.serialNum);

// Has to be prevCtx, not ctx
Expand Down

0 comments on commit 0617852

Please sign in to comment.