Skip to content

Commit

Permalink
trying to reduce async context - incomplete
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Aug 17, 2019
1 parent 5cda2f9 commit 68df34d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 57 deletions.
8 changes: 0 additions & 8 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ public bool CompletePending(bool spinWait = false)
/// <returns></returns>
public async ValueTask CompletePendingAsync()
{
ResumeThread();
await fht.CompletePendingAsync(this);
ResumeThread();

SuspendThread();
}

/// <summary>
Expand All @@ -178,11 +174,7 @@ public bool CompleteCheckpoint(bool spinWait = false)
/// <returns></returns>
internal async ValueTask CompleteCheckpointAsync()
{
ResumeThread();
await fht.CompleteCheckpointAsync(this);
ResumeThread();

SuspendThread();
}
}
}
73 changes: 27 additions & 46 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,56 +23,41 @@ public partial class FasterKV<Key, Value, Input, Output, Context, Functions> : F
/// <returns></returns>
internal async ValueTask CompletePendingAsync(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession)
{
do
{
bool done = true;

#region Previous pending requests
if (threadCtx.Value.phase == Phase.IN_PROGRESS
||
threadCtx.Value.phase == Phase.WAIT_PENDING)
{
await CompleteIOPendingRequestsAsync(prevThreadCtx.Value, clientSession);
clientSession.ResumeThread();
bool done = true;

Debug.Assert(prevThreadCtx.Value.ioPendingRequests.Count == 0);

await InternalRefreshAsync(clientSession);
clientSession.ResumeThread();
#region Previous pending requests
if (threadCtx.Value.phase == Phase.IN_PROGRESS
||
threadCtx.Value.phase == Phase.WAIT_PENDING)
{

CompleteRetryRequests(prevThreadCtx.Value);
await CompleteIOPendingRequestsAsync(prevThreadCtx.Value, clientSession);
Debug.Assert(prevThreadCtx.Value.ioPendingRequests.Count == 0);

done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0);
done &= (prevThreadCtx.Value.retryRequests.Count == 0);
}
#endregion
CompleteRetryRequests(prevThreadCtx.Value);

if (!(threadCtx.Value.phase == Phase.IN_PROGRESS
||
threadCtx.Value.phase == Phase.WAIT_PENDING))
{
await CompleteIOPendingRequestsAsync(threadCtx.Value, clientSession);
clientSession.ResumeThread();
done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0);
done &= (prevThreadCtx.Value.retryRequests.Count == 0);
}
#endregion

Debug.Assert(threadCtx.Value.ioPendingRequests.Count == 0);
}
await InternalRefreshAsync(clientSession);
clientSession.ResumeThread();
if (!(threadCtx.Value.phase == Phase.IN_PROGRESS
||
threadCtx.Value.phase == Phase.WAIT_PENDING))
{
await CompleteIOPendingRequestsAsync(threadCtx.Value, clientSession);
Debug.Assert(threadCtx.Value.ioPendingRequests.Count == 0);
}

CompleteRetryRequests(threadCtx.Value);
CompleteRetryRequests(threadCtx.Value);

done &= (threadCtx.Value.ioPendingRequests.Count == 0);
done &= (threadCtx.Value.retryRequests.Count == 0);
done &= (threadCtx.Value.ioPendingRequests.Count == 0);
done &= (threadCtx.Value.retryRequests.Count == 0);

if (done)
{
return;
}
else
{
throw new Exception("CompletePending loops");
}
} while (true);
if (!done)
{
throw new Exception("CompletePendingAsync did not complete");
}
}

/// <summary>
Expand All @@ -89,15 +74,11 @@ internal async ValueTask CompleteCheckpointAsync(ClientSession<Key, Value, Input
do
{
await InternalRefreshAsync(clientSession);
clientSession.ResumeThread();

await CompletePendingAsync(clientSession);
clientSession.ResumeThread();

if (_systemState.phase == Phase.REST)
{
await CompletePendingAsync(clientSession);
clientSession.ResumeThread();
return;
}

Expand Down
13 changes: 10 additions & 3 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,23 @@ internal void CompleteIOPendingRequests(FasterExecutionContext context)

if (context.readyResponses.Count > 0)
{
context.readyResponses.TryDequeue(out request);
clientSession.ResumeThread();
while (context.readyResponses.Count > 0)
{
context.readyResponses.TryDequeue(out request);
InternalContinuePendingRequestAndCallback(context, request);
}
clientSession.SuspendThread();
}
else
{
clientSession.SuspendThread();
request = await context.readyResponses.DequeueAsync(token);

clientSession.ResumeThread();
InternalContinuePendingRequestAndCallback(context, request);
clientSession.SuspendThread();
}

InternalContinuePendingRequestAndCallback(context, request);
}
}

Expand Down

0 comments on commit 68df34d

Please sign in to comment.