Skip to content

Commit

Permalink
[C#] State machine adjustments (#821)
Browse files Browse the repository at this point in the history
* State machine adjustments:
* Make sure ongoing multi-key ops drain out in PREPARE
* Correctly handle index resize

* add comment

* wire in CheckpointVersionSwitchBarrier correctly

* update

* push testcase

* add comment

* nit

* added yield

* nit

* fix test

* fix test

* refresh during acqurelockable if needed

* clean up grow logic
  • Loading branch information
badrishc committed Apr 20, 2023
1 parent 5bace95 commit e25313b
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 62 deletions.
6 changes: 5 additions & 1 deletion cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ internal void AcquireLockable()
{
// Checkpoints cannot complete while we have active locking sessions.
while (IsInPreparePhase())
{
if (fht.epoch.ThisInstanceProtected())
fht.InternalRefresh<Input, Output, Context, InternalFasterSession>(FasterSession);
Thread.Yield();
}

fht.IncrementNumLockingSessions();
isAcquiredLockable = true;
Expand Down Expand Up @@ -938,7 +942,7 @@ void IClientSession.AtomicSwitch(long version)
}

/// <summary>
/// Return true if Faster State Machine is in PREPARE sate
/// Return true if Faster State Machine is in PREPARE state
/// </summary>
internal bool IsInPreparePhase()
{
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Index/Common/FasterKVSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ internal CheckpointSettings GetCheckpointSettings()
CheckpointDir = CheckpointDir,
CheckpointManager = CheckpointManager,
RemoveOutdated = RemoveOutdatedCheckpoints,
ThrottleCheckpointFlushDelayMs = ThrottleCheckpointFlushDelayMs
ThrottleCheckpointFlushDelayMs = ThrottleCheckpointFlushDelayMs,
CheckpointVersionSwitchBarrier = CheckpointVersionSwitchBarrier
};
}

Expand Down
20 changes: 8 additions & 12 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public partial class FasterKV<Key, Value> : FasterBase,

internal readonly bool DoTransientLocking; // uses LockTable
internal readonly bool DoEphemeralLocking; // uses RecordInfo
readonly bool CheckpointVersionSwitchBarrier; // version switch barrier
internal readonly bool CheckpointVersionSwitchBarrier; // version switch barrier
internal readonly OverflowBucketLockTable<Key, Value> LockTable;

internal void IncrementNumLockingSessions()
Expand Down Expand Up @@ -730,18 +730,14 @@ public bool GrowIndex()
while (true)
{
SystemState _systemState = SystemState.Copy(ref systemState);
if (_systemState.Phase == Phase.IN_PROGRESS_GROW)
{
SplitBuckets(0);
epoch.ProtectAndDrain();
}
else
{
SystemState.RemoveIntermediate(ref _systemState);
if (_systemState.Phase != Phase.PREPARE_GROW && _systemState.Phase != Phase.IN_PROGRESS_GROW)
break;
if (_systemState.Phase == Phase.PREPARE_GROW)
ThreadStateMachineStep<Empty, Empty, Empty, NullFasterSession>(null, NullFasterSession.Instance, default);
}
else if (_systemState.Phase == Phase.IN_PROGRESS_GROW)
SplitBuckets(0);
else if (_systemState.Phase == Phase.REST)
break;
epoch.ProtectAndDrain();
Thread.Yield();
}
}
finally
Expand Down
24 changes: 19 additions & 5 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,32 @@ public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
{
ThreadStateMachineStep(fasterSession.Ctx, fasterSession, default);

// In prepare phases, after draining out ongoing transactions,
// spin and get threads to reach the next version before proceeding

// In prepare phases, after draining out ongoing multi-key ops, we may spin and get threads to
// reach the next version before proceeding

// If CheckpointVersionSwitchBarrier is set, then:
// If system is in PREPARE phase AND all multi-key ops have drained (NumActiveLockingSessions == 0):
// Then (PREPARE, v) threads will SPIN during Refresh until they are in (IN_PROGRESS, v+1).
//
// That way no thread can work in the PREPARE phase while any thread works in IN_PROGRESS phase.
// This is safe, because the state machine is guaranteed to progress to (IN_PROGRESS, v+1) if all threads
// have reached PREPARE and all multi-key ops have drained (see VersionChangeTask.OnThreadState).
if (CheckpointVersionSwitchBarrier &&
fasterSession.Ctx.phase == Phase.PREPARE &&
fasterSession.Ctx.phase == Phase.PREPARE &&
hlog.NumActiveLockingSessions == 0)
{
epoch.ProtectAndDrain();
Thread.Yield();
continue;
}

if (fasterSession.Ctx.phase == Phase.PREPARE_GROW &&
hlog.NumActiveLockingSessions == 0)
{
epoch.ProtectAndDrain();
Thread.Yield();
continue;

}
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Va
{
OperationStackContext<Key, Value> stackCtx = new(comparer.GetHashCode64(ref key));

if (fasterSession.Ctx.phase != Phase.REST)
HeavyEnter(stackCtx.hei.hash, fasterSession.Ctx, fasterSession);
if (fasterSession.Ctx.phase == Phase.IN_PROGRESS_GROW)
SplitBuckets(stackCtx.hei.hash);

if (FindTag(ref stackCtx.hei))
{
Expand Down
18 changes: 0 additions & 18 deletions cs/src/core/Index/FASTER/Implementation/EpochOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,6 @@ public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Va
pendingContext.version = sessionCtx.version;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void HeavyEnter<Input, Output, Context, FasterSession>(long hash, FasterExecutionContext<Input, Output, Context> ctx, FasterSession session)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
if (ctx.phase == Phase.PREPARE_GROW)
{
// We spin-wait as a simplification
// Could instead do a "heavy operation" here
while (systemState.Phase != Phase.IN_PROGRESS_GROW)
Thread.SpinWait(100);
InternalRefresh<Input, Output, Context, FasterSession>(session);
}
if (ctx.phase == Phase.IN_PROGRESS_GROW)
{
SplitBuckets(hash);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
void SpinWaitUntilClosed(long address)
{
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/Implementation/InternalDelete.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Va

OperationStackContext<Key, Value> stackCtx = new(comparer.GetHashCode64(ref key));

if (fasterSession.Ctx.phase != Phase.REST)
HeavyEnter(stackCtx.hei.hash, fasterSession.Ctx, fasterSession);
if (fasterSession.Ctx.phase == Phase.IN_PROGRESS_GROW)
SplitBuckets(stackCtx.hei.hash);

var tagExists = FindTag(ref stackCtx.hei);
if (!tagExists)
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/Implementation/InternalRMW.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Va

OperationStackContext<Key, Value> stackCtx = new(comparer.GetHashCode64(ref key));

if (fasterSession.Ctx.phase != Phase.REST)
HeavyEnter(stackCtx.hei.hash, fasterSession.Ctx, fasterSession);
if (fasterSession.Ctx.phase == Phase.IN_PROGRESS_GROW)
SplitBuckets(stackCtx.hei.hash);

FindOrCreateTag(ref stackCtx.hei, hlog.BeginAddress);
stackCtx.SetRecordSourceToHashEntry(hlog);
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/Implementation/InternalRead.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Va
{
OperationStackContext<Key, Value> stackCtx = new(comparer.GetHashCode64(ref key));

if (fasterSession.Ctx.phase != Phase.REST)
HeavyEnter(stackCtx.hei.hash, fasterSession.Ctx, fasterSession);
if (fasterSession.Ctx.phase == Phase.IN_PROGRESS_GROW)
SplitBuckets(stackCtx.hei.hash);

#region Trace back for record in readcache and in-memory HybridLog

Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Va

OperationStackContext<Key, Value> stackCtx = new(comparer.GetHashCode64(ref key));

if (fasterSession.Ctx.phase != Phase.REST)
HeavyEnter(stackCtx.hei.hash, fasterSession.Ctx, fasterSession);
if (fasterSession.Ctx.phase == Phase.IN_PROGRESS_GROW)
SplitBuckets(stackCtx.hei.hash);

FindOrCreateTag(ref stackCtx.hei, hlog.BeginAddress);
stackCtx.SetRecordSourceToHashEntry(hlog);
Expand Down
14 changes: 11 additions & 3 deletions cs/src/core/Index/Synchronization/FasterStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ private bool MakeTransition(SystemState expectedState, SystemState nextState)
/// the given expected state.
/// </summary>
/// <param name="expectedState">expected current global state</param>
internal void GlobalStateMachineStep(SystemState expectedState)
/// <param name="bumpEpoch">whether we bump the epoch for the final state transition</param>
internal void GlobalStateMachineStep(SystemState expectedState, bool bumpEpoch = false)
{
// Between state transition, temporarily block any concurrent execution thread
// from progressing to prevent perceived inconsistencies
Expand All @@ -113,12 +114,20 @@ internal void GlobalStateMachineStep(SystemState expectedState)

var nextState = currentSyncStateMachine.NextState(expectedState);

if (bumpEpoch)
epoch.BumpCurrentEpoch(() => MakeTransitionWorker(intermediate, nextState));
else
MakeTransitionWorker(intermediate, nextState);
}

void MakeTransitionWorker(SystemState intermediate, SystemState nextState)
{
// Execute custom task logic
currentSyncStateMachine.GlobalBeforeEnteringState(nextState, this);
// Execute any additional callbacks in critical section
foreach (var callback in callbacks)
callback.BeforeEnteringState(nextState, this);

var success = MakeTransition(intermediate, nextState);
// Guaranteed to succeed, because other threads will always block while the system is in intermediate.
Debug.Assert(success);
Expand All @@ -128,7 +137,6 @@ internal void GlobalStateMachineStep(SystemState expectedState)
if (nextState.Phase == Phase.REST) stateMachineActive = 0;
}


// Given the current global state, return the starting point of the state machine cycle
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private SystemState StartOfCurrentCycle(SystemState currentGlobalState)
Expand Down
4 changes: 3 additions & 1 deletion cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ internal sealed class IndexResizeTask : ISynchronizationTask
switch (current.Phase)
{
case Phase.PREPARE_GROW:
// Using bumpEpoch: true allows us to guarantee that when system state proceeds, all threads in prior state
// will see that hlog.NumActiveLockingSessions == 0, ensuring that they can potentially block for the next state.
if (allThreadsInPrepareGrow && faster.hlog.NumActiveLockingSessions == 0)
faster.GlobalStateMachineStep(current);
faster.GlobalStateMachineStep(current, bumpEpoch: true);
break;

case Phase.IN_PROGRESS_GROW:
Expand Down
11 changes: 4 additions & 7 deletions cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ internal sealed class VersionChangeTask : ISynchronizationTask
ctx.markers[EpochPhaseIdx.Prepare] = true;

faster.epoch.Mark(EpochPhaseIdx.Prepare, current.Version);

// Using bumpEpoch: true allows us to guarantee that when system state proceeds, all threads in prior state
// will see that hlog.NumActiveLockingSessions == 0, ensuring that they can potentially block for the next state.
if (faster.epoch.CheckIsComplete(EpochPhaseIdx.Prepare, current.Version) && faster.hlog.NumActiveLockingSessions == 0)
faster.GlobalStateMachineStep(current);
faster.GlobalStateMachineStep(current, bumpEpoch: faster.CheckpointVersionSwitchBarrier);
break;
case Phase.IN_PROGRESS:
if (ctx != null)
Expand Down Expand Up @@ -130,12 +133,6 @@ protected VersionChangeStateMachine(long targetVersion = -1, params ISynchroniza
this.targetVersion = targetVersion;
}

/// <summary>
/// Construct a new VersionChangeStateMachine that folds over the log at the end without waiting for flush.
/// </summary>
/// <param name="targetVersion">upper limit (inclusive) of the version included</param>
public VersionChangeStateMachine(long targetVersion = -1) : this(targetVersion, new VersionChangeTask(), new FoldOverTask()) { }

/// <inheritdoc />
public override SystemState NextState(SystemState start)
{
Expand Down
Loading

0 comments on commit e25313b

Please sign in to comment.