Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 12, 2019
1 parent 78c2649 commit e170463
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 39 deletions.
19 changes: 7 additions & 12 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ internal async ValueTask CompletePendingAsync(ClientSession<Key, Value, Input, O
{
bool done = true;

#region Previous pending requests
if (!RelaxedCPR)
{
#region Previous pending requests
if (clientSession.ctx.phase == Phase.IN_PROGRESS
||
clientSession.ctx.phase == Phase.WAIT_PENDING)
Expand All @@ -46,19 +46,14 @@ internal async ValueTask CompletePendingAsync(ClientSession<Key, Value, Input, O
done &= (clientSession.ctx.prevCtx.ioPendingRequests.Count == 0);
done &= (clientSession.ctx.prevCtx.retryRequests.Count == 0);
}
#endregion
}

if (RelaxedCPR || (!(clientSession.ctx.phase == Phase.IN_PROGRESS
||
clientSession.ctx.phase == Phase.WAIT_PENDING)))
{
await CompleteIOPendingRequestsAsync(clientSession.ctx, clientSession.ctx, clientSession);
Debug.Assert(clientSession.ctx.ioPendingRequests.Count == 0);
}
#endregion

await CompleteIOPendingRequestsAsync(clientSession.ctx, clientSession.ctx, clientSession);
CompleteRetryRequests(clientSession.ctx, clientSession.ctx, clientSession);

Debug.Assert(clientSession.ctx.ioPendingRequests.Count == 0);

done &= (clientSession.ctx.ioPendingRequests.Count == 0);
done &= (clientSession.ctx.retryRequests.Count == 0);

Expand Down Expand Up @@ -233,11 +228,11 @@ private async ValueTask HandleCheckpointingPhasesAsync(FasterExecutionContext ct
AtomicSwitch(ctx, ctx.prevCtx, _ctx.version);
InitContext(ctx, ctx.prevCtx.guid);

// Has to be prevThreadCtx, not ctx
// Has to be prevCtx, not ctx
ctx.prevCtx.markers[EpochPhaseIdx.InProgress] = true;
}

// Has to be prevThreadCtx, not ctx
// Has to be prevCtx, not ctx
if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.InProgress, ctx.prevCtx.version))
{
GlobalMoveToNextCheckpointState(currentState);
Expand Down
10 changes: 3 additions & 7 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ private enum CheckpointType

private ConcurrentDictionary<Guid, CommitPoint> _recoveredSessions;

private readonly FastThreadLocal<FasterExecutionContext> prevThreadCtx;
private readonly FastThreadLocal<FasterExecutionContext> threadCtx;


Expand All @@ -95,7 +94,6 @@ private enum CheckpointType
public FasterKV(long size, Functions functions, LogSettings logSettings, CheckpointSettings checkpointSettings = null, SerializerSettings<Key, Value> serializerSettings = null, IFasterEqualityComparer<Key> comparer = null, VariableLengthStructSettings<Key, Value> variableLengthStructSettings = null)
{
threadCtx = new FastThreadLocal<FasterExecutionContext>();
prevThreadCtx = new FastThreadLocal<FasterExecutionContext>();

if (comparer != null)
this.comparer = comparer;
Expand Down Expand Up @@ -294,7 +292,6 @@ public Guid StartSession()
return InternalAcquire();
}


/// <summary>
/// Continue session with FASTER
/// </summary>
Expand Down Expand Up @@ -344,10 +341,10 @@ public bool CompletePending(bool wait = false)
public IEnumerable<long> GetPendingRequests()
{

foreach (var kvp in prevThreadCtx.Value.ioPendingRequests)
foreach (var kvp in threadCtx.Value.prevCtx?.ioPendingRequests)
yield return kvp.Value.serialNum;

foreach (var val in prevThreadCtx.Value.retryRequests)
foreach (var val in threadCtx.Value.prevCtx?.retryRequests)
yield return val.serialNum;

foreach (var kvp in threadCtx.Value.ioPendingRequests)
Expand Down Expand Up @@ -548,8 +545,7 @@ public bool GrowIndex()
public void Dispose()
{
base.Free();
threadCtx.Dispose();
prevThreadCtx.Dispose();
threadCtx?.Dispose();
hlog.Dispose();
readcache?.Dispose();
}
Expand Down
30 changes: 10 additions & 20 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ internal Guid InternalAcquire()
{
epoch.Acquire();
threadCtx.InitializeThread();
prevThreadCtx.InitializeThread();
Phase phase = _systemState.phase;
if (phase != Phase.REST)
{
Expand All @@ -30,10 +29,10 @@ internal Guid InternalAcquire()
Guid guid = Guid.NewGuid();
threadCtx.Value = new FasterExecutionContext();
InitContext(threadCtx.Value, guid);
prevThreadCtx.Value = new FasterExecutionContext();
InitContext(prevThreadCtx.Value, guid);
prevThreadCtx.Value.version--;
threadCtx.Value.prevCtx = prevThreadCtx.Value;

threadCtx.Value.prevCtx = new FasterExecutionContext();
InitContext(threadCtx.Value.prevCtx, guid);
threadCtx.Value.prevCtx.version--;
InternalRefresh(threadCtx.Value);
return threadCtx.Value.guid;
}
Expand Down Expand Up @@ -183,33 +182,24 @@ internal bool InternalCompletePending(FasterExecutionContext ctx, bool wait = fa
{
bool done = true;

#region Previous pending requests
if (!RelaxedCPR)
{
#region Previous pending requests
if (ctx.phase == Phase.IN_PROGRESS
||
ctx.phase == Phase.WAIT_PENDING)
if (ctx.phase == Phase.IN_PROGRESS || ctx.phase == Phase.WAIT_PENDING)
{
CompleteIOPendingRequests(ctx.prevCtx, ctx);
epoch.ProtectAndDrain(); // incorrect?
// InternalRefresh();
CompleteRetryRequests(ctx.prevCtx, ctx);
InternalRefresh(ctx);

done &= (ctx.prevCtx.ioPendingRequests.Count == 0);
done &= (ctx.prevCtx.retryRequests.Count == 0);
}
#endregion
}
#endregion

if (!(ctx.phase == Phase.IN_PROGRESS
||
ctx.phase == Phase.WAIT_PENDING))
{
CompleteIOPendingRequests(ctx, ctx);
}
epoch.ProtectAndDrain(); // incorrect?
// InternalRefresh();
CompleteIOPendingRequests(ctx, ctx);
CompleteRetryRequests(ctx, ctx);
InternalRefresh(ctx);

done &= (ctx.ioPendingRequests.Count == 0);
done &= (ctx.retryRequests.Count == 0);
Expand Down

0 comments on commit e170463

Please sign in to comment.