Skip to content

Commit

Permalink
Add ReadyToCompletePendingAsync for client to check if there is somet… (
Browse files Browse the repository at this point in the history
#269)

* Add ReadyToCompletePendingAsync for client to check if there is something to complete-pending on.

* Updates
  • Loading branch information
badrishc authored May 29, 2020
1 parent 352db06 commit b073aa7
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 7 deletions.
19 changes: 18 additions & 1 deletion cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ public void Refresh()
}

/// <summary>
/// Sync complete outstanding pending operations
/// Sync complete all outstanding pending operations
/// Async operations (ReadAsync) must be completed individually
/// </summary>
/// <param name="spinWait">Spin-wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Extend spin-wait until ongoing commit/checkpoint, if any, completes</param>
Expand Down Expand Up @@ -365,6 +366,22 @@ public async ValueTask CompletePendingAsync(bool waitForCommit = false, Cancella
await WaitForCommitAsync(token);
}

/// <summary>
/// Check if at least one request is ready for CompletePending to be called on
/// Returns completed immediately if there are no outstanding requests
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public async ValueTask ReadyToCompletePendingAsync(CancellationToken token = default)
{
token.ThrowIfCancellationRequested();

if (fht.epoch.ThisInstanceProtected())
throw new NotSupportedException("Async operations not supported over protected epoch");

await fht.ReadyToCompletePendingAsync(this, token);
}

/// <summary>
/// Wait for commit of all operations completed until the current point in session.
/// Does not itself issue checkpoint/commits.
Expand Down
31 changes: 29 additions & 2 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ public partial class FasterKV<Key, Value, Input, Output, Context, Functions> : F
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{

/// <summary>
/// Check if at least one (sync) request is ready for CompletePending to operate on
/// </summary>
/// <param name="clientSession"></param>
/// <param name="token"></param>
/// <returns></returns>
internal async ValueTask ReadyToCompletePendingAsync(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession, CancellationToken token = default)
{
#region Previous pending requests
if (!RelaxedCPR)
{
if (clientSession.ctx.phase == Phase.IN_PROGRESS || clientSession.ctx.phase == Phase.WAIT_PENDING)
{
if (clientSession.ctx.prevCtx.SyncIoPendingCount != 0)
await clientSession.ctx.prevCtx.readyResponses.WaitForEntryAsync(token);
}
}
#endregion

if (clientSession.ctx.SyncIoPendingCount != 0)
await clientSession.ctx.readyResponses.WaitForEntryAsync(token);
}

/// <summary>
/// Complete outstanding pending operations that were issued synchronously
/// Async operations (e.g., ReadAsync) need to be completed individually
Expand All @@ -46,7 +70,7 @@ internal async ValueTask CompletePendingAsync(ClientSession<Key, Value, Input, O
await clientSession.ctx.prevCtx.pendingReads.WaitEmptyAsync();

await InternalCompletePendingRequestsAsync(clientSession.ctx.prevCtx, clientSession.ctx, clientSession, token);
Debug.Assert(clientSession.ctx.prevCtx.ioPendingRequests.Count == 0);
Debug.Assert(clientSession.ctx.prevCtx.SyncIoPendingCount == 0);

if (clientSession.ctx.prevCtx.retryRequests.Count > 0)
{
Expand Down Expand Up @@ -106,7 +130,7 @@ internal ReadAsyncInternal(FasterKV<Key, Value, Input, Output, Context, Function
{
Debug.Assert(_fasterKV.RelaxedCPR);

_result = _fasterKV.InternalCompletePendingReadRequestAsync(
_result = _fasterKV.InternalCompletePendingReadRequest(
_clientSession.ctx, _clientSession.ctx, _diskRequest, _pendingContext);
}
finally
Expand All @@ -121,6 +145,7 @@ internal ReadAsyncInternal(FasterKV<Key, Value, Input, Output, Context, Function
finally
{
_clientSession.ctx.ioPendingRequests.Remove(_pendingContext.id);
_clientSession.ctx.asyncPendingCount--;
}
}

Expand Down Expand Up @@ -212,6 +237,7 @@ static async ValueTask<ReadAsyncResult> SlowReadAsync(
{
var diskRequest = @this.ScheduleGetFromDisk(clientSession.ctx, ref pendingContext);
clientSession.ctx.ioPendingRequests.Add(pendingContext.id, pendingContext);
clientSession.ctx.asyncPendingCount++;
clientSession.ctx.pendingReads.Add();

try
Expand All @@ -226,6 +252,7 @@ static async ValueTask<ReadAsyncResult> SlowReadAsync(
catch
{
clientSession.ctx.ioPendingRequests.Remove(pendingContext.id);
clientSession.ctx.asyncPendingCount--;
throw;
}
finally
Expand Down
6 changes: 4 additions & 2 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,16 @@ internal sealed class FasterExecutionContext : SerializedFasterExecutionContext
public AsyncCountDown pendingReads;
public AsyncQueue<AsyncIOContext<Key, Value>> readyResponses;
public List<long> excludedSerialNos;
public int asyncPendingCount;

public int SyncIoPendingCount => ioPendingRequests.Count - asyncPendingCount;

public bool HasNoPendingRequests
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get
{
return ioPendingRequests.Count == 0
&& retryRequests.Count == 0;
return SyncIoPendingCount == 0 && retryRequests.Count == 0;
}
}

Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ internal void InternalCompletePendingRequests(FasterExecutionContext opCtx, Fast

internal async ValueTask InternalCompletePendingRequestsAsync(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession, CancellationToken token = default)
{
while (opCtx.ioPendingRequests.Count > 0)
while (opCtx.SyncIoPendingCount > 0)
{
AsyncIOContext<Key, Value> request;

Expand Down Expand Up @@ -363,7 +363,7 @@ internal void InternalCompletePendingRequest(FasterExecutionContext opCtx, Faste
}
}

internal (Status, Output) InternalCompletePendingReadRequestAsync(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, AsyncIOContext<Key, Value> request, PendingContext pendingContext)
internal (Status, Output) InternalCompletePendingReadRequest(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, AsyncIOContext<Key, Value> request, PendingContext pendingContext)
{
(Status, Output) s = default;

Expand Down
10 changes: 10 additions & 0 deletions cs/src/core/Utilities/AsyncQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
}
}

/// <summary>
/// Wait for queue to have at least one entry
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public async Task WaitForEntryAsync(CancellationToken token = default)
{
await semaphore.WaitAsync(token);
}

/// <summary>
/// Try dequeue (if item exists)
/// </summary>
Expand Down

0 comments on commit b073aa7

Please sign in to comment.