Skip to content

Commit

Permalink
Persist dormant sessions correctly.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Aug 8, 2019
1 parent dbe13fd commit 1754b68
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 12 deletions.
4 changes: 2 additions & 2 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public class ClientSession<Key, Value, Input, Output, Context, Functions> : IDis
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
private FasterKV<Key, Value, Input, Output, Context, Functions> fht;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevCtx;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx;
internal FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevCtx;
internal FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx;

internal ClientSession(
FasterKV<Key, Value, Input, Output, Context, Functions> fht,
Expand Down
19 changes: 16 additions & 3 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ internal async ValueTask InternalRefreshAsync()
await HandleCheckpointingPhasesAsync();
}


private bool AtomicSwitch(FasterExecutionContext fromCtx, FasterExecutionContext toCtx, int version)
{
lock (toCtx)
{
if (toCtx.version < version)
{
CopyContext(fromCtx, toCtx);
_hybridLogCheckpoint.info.checkpointTokens.TryAdd(toCtx.guid, toCtx.serialNum);
return true;
}
}
return false;
}

private async ValueTask HandleCheckpointingPhasesAsync()
{
var previousState = SystemState.Make(threadCtx.Value.phase, threadCtx.Value.version);
Expand Down Expand Up @@ -225,7 +240,7 @@ private async ValueTask HandleCheckpointingPhasesAsync()

if (!ctx.markers[EpochPhaseIdx.InProgress])
{
CopyContext(threadCtx.Value, prevThreadCtx.Value);
AtomicSwitch(threadCtx.Value, prevThreadCtx.Value, ctx.version);
InitContext(threadCtx.Value, prevThreadCtx.Value.guid);

if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.InProgress, ctx.version))
Expand Down Expand Up @@ -311,8 +326,6 @@ private async ValueTask HandleCheckpointingPhasesAsync()

if (notify)
{
_hybridLogCheckpoint.info.checkpointTokens.TryAdd(prevThreadCtx.Value.guid, prevThreadCtx.Value.serialNum);

if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitFlush, prevThreadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
Expand Down
1 change: 1 addition & 0 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientS
InitContext(ctx, guid);
var prevCtx = new FasterExecutionContext();
InitContext(prevCtx, guid);
prevCtx.version--;
if (_activeSessions == null)
Interlocked.CompareExchange(ref _activeSessions, new Dictionary<Guid, ClientSession<Key, Value, Input, Output, Context, Functions>>(), null);
var session = new ClientSession<Key, Value, Input, Output, Context, Functions>(this, prevCtx, ctx);
Expand Down
9 changes: 5 additions & 4 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,12 @@ public byte[] ToByteArray()
writer.WriteLine(startLogicalAddress);
writer.WriteLine(finalLogicalAddress);
writer.WriteLine(headAddress);
writer.WriteLine(numThreads);
for (int i = 0; i < numThreads; i++)
writer.WriteLine(checkpointTokens.Count);

foreach (var kvp in checkpointTokens)
{
writer.WriteLine(guids[i]);
writer.WriteLine(checkpointTokens[guids[i]]);
writer.WriteLine(kvp.Key);
writer.WriteLine(kvp.Value);
}

// Write object log segment offsets
Expand Down
2 changes: 2 additions & 0 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ internal Guid InternalAcquire()
InitContext(threadCtx.Value, guid);
prevThreadCtx.Value = new FasterExecutionContext();
InitContext(prevThreadCtx.Value, guid);
prevThreadCtx.Value.version--;
InternalRefresh();
return threadCtx.Value.guid;
}
Expand Down Expand Up @@ -62,6 +63,7 @@ internal long InternalContinue(Guid guid)
InitContext(threadCtx.Value, guid);
prevThreadCtx.Value = new FasterExecutionContext();
InitContext(prevThreadCtx.Value, guid);
prevThreadCtx.Value.version--;
threadCtx.Value.serialNum = serialNum;
InternalRefresh();
}
Expand Down
12 changes: 9 additions & 3 deletions cs/src/core/Index/Recovery/Checkpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta
}
case Phase.PERSISTENCE_CALLBACK:
{
if (_activeSessions != null)
{
// write dormant sessions to checkpoint
foreach (var kvp in _activeSessions)
{
AtomicSwitch(kvp.Value.ctx, kvp.Value.prevCtx, currentState.version - 1);
}
}
WriteHybridLogMetaInfo();

if (_checkpointType == CheckpointType.FULL)
Expand Down Expand Up @@ -495,7 +503,7 @@ private void HandleCheckpointingPhases()

if (!ctx.markers[EpochPhaseIdx.InProgress])
{
CopyContext(threadCtx.Value, prevThreadCtx.Value);
AtomicSwitch(threadCtx.Value, prevThreadCtx.Value, ctx.version);
InitContext(threadCtx.Value, prevThreadCtx.Value.guid);

if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.InProgress, ctx.version))
Expand Down Expand Up @@ -546,8 +554,6 @@ private void HandleCheckpointingPhases()

if (notify)
{
_hybridLogCheckpoint.info.checkpointTokens.TryAdd(prevThreadCtx.Value.guid, prevThreadCtx.Value.serialNum);

if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitFlush, prevThreadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
Expand Down

0 comments on commit 1754b68

Please sign in to comment.