Skip to content

Commit

Permalink
Added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Dec 12, 2019
1 parent 0237a0c commit 255c000
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 28 deletions.
5 changes: 2 additions & 3 deletions cs/playground/FasterKVAsyncSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ static async Task AsyncOperator(int id)

if (!batched)
{
// Single commit version - append each item and wait for commit
// Single commit version - upsert each item and wait for commit
// Needs high parallelism (NumParallelTasks) for perf
// Needs separate commit thread to perform regular commit
// Otherwise we commit only at page boundaries
// Needs separate commit thread to perform regular checkpoints
while (true)
{
try
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Allocator/MallocFixedPageSize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,9 @@ public bool IsCheckpointCompleted()
/// Is checkpoint completed
/// </summary>
/// <returns></returns>
public async ValueTask IsCheckpointCompletedAsync()
public async ValueTask IsCheckpointCompletedAsync(CancellationToken token = default)
{
await checkpointSemaphore.WaitAsync();
await checkpointSemaphore.WaitAsync(token);
checkpointSemaphore.Release();
}

Expand Down
27 changes: 23 additions & 4 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask<Output> ReadAsync(Key key, Input input, bool waitForCommit = false, CancellationToken token = default)
public async ValueTask<(Status, Output)> ReadAsync(Key key, Input input, bool waitForCommit = false, CancellationToken token = default)
{
Output output = default;
Context context = default;
var status = Read(ref key, ref input, ref output, context, ctx.serialNum + 1);
if (status == Status.PENDING)
await CompletePendingAsync(waitForCommit, token);
return await CompletePendingReadAsync(ctx.serialNum, waitForCommit, token);
else if (waitForCommit)
await WaitForCommitAsync(token);
return output;
return (status, output);
}

/// <summary>
Expand Down Expand Up @@ -288,10 +288,29 @@ public async ValueTask CompletePendingAsync(bool waitForCommit = false, Cancella
if (fht.epoch.ThisInstanceProtected())
throw new NotSupportedException("Async operations not supported over protected epoch");

// Complete all pending operations on session
await fht.CompletePendingAsync(this, token);

// Wait for commit if necessary
if (waitForCommit)
await WaitForCommitAsync(token);
}

private async ValueTask<(Status, Output)> CompletePendingReadAsync(long serialNo, bool waitForCommit = false, CancellationToken token = default)
{
token.ThrowIfCancellationRequested();

if (fht.epoch.ThisInstanceProtected())
throw new NotSupportedException("Async operations not supported over protected epoch");

if (waitForCommit)
{
(Status, Output) s = await fht.CompletePendingReadAsync(serialNo, this, token);
await WaitForCommitAsync(token);
return s;
}
else
await fht.CompletePendingAsync(this, token);
return await fht.CompletePendingReadAsync(serialNo, this, token);
}

/// <summary>
Expand Down
54 changes: 41 additions & 13 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -71,21 +72,48 @@ internal async ValueTask CompletePendingAsync(ClientSession<Key, Value, Input, O
}

/// <summary>
/// Complete the ongoing checkpoint (if any)
/// Complete outstanding pending operations
/// </summary>
/// <returns></returns>
internal async ValueTask CompleteCheckpointAsync(FasterExecutionContext ctx, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession)
internal async ValueTask<(Status, Output)> CompletePendingReadAsync(long serialNo, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession, CancellationToken token = default)
{
// Called outside active session
while (true)
bool done = true;

#region Previous pending requests
if (!RelaxedCPR)
{
var systemState = _systemState;
await InternalRefreshAsync(ctx, clientSession);
await CompletePendingAsync(clientSession);
if (clientSession.ctx.phase == Phase.IN_PROGRESS
||
clientSession.ctx.phase == Phase.WAIT_PENDING)
{

await CompleteIOPendingRequestsAsync(clientSession.ctx.prevCtx, clientSession.ctx, clientSession, token);
Debug.Assert(clientSession.ctx.prevCtx.ioPendingRequests.Count == 0);

if (clientSession.ctx.prevCtx.retryRequests.Count > 0)
{
CompleteRetryRequests(clientSession.ctx.prevCtx, clientSession.ctx, clientSession);
}

if (systemState.phase == Phase.REST)
return;
done &= (clientSession.ctx.prevCtx.ioPendingRequests.Count == 0);
done &= (clientSession.ctx.prevCtx.retryRequests.Count == 0);
}
}
#endregion

var s = await CompleteIOPendingReadRequestsAsync(serialNo, clientSession.ctx, clientSession.ctx, clientSession, token);
CompleteRetryRequests(clientSession.ctx, clientSession.ctx, clientSession);

Debug.Assert(clientSession.ctx.ioPendingRequests.Count == 0);

done &= (clientSession.ctx.ioPendingRequests.Count == 0);
done &= (clientSession.ctx.retryRequests.Count == 0);

if (!done)
{
throw new Exception("CompletePendingAsync did not complete");
}
return s;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -138,7 +166,7 @@ private SystemState GetStartState(SystemState state)
return SystemState.Make(Phase.REST, state.version);
}

private async ValueTask HandleCheckpointingPhasesAsync(FasterExecutionContext ctx, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession, bool async = true)
private async ValueTask HandleCheckpointingPhasesAsync(FasterExecutionContext ctx, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession, bool async = true, CancellationToken token = default)
{
if (async)
clientSession?.UnsafeResumeThread();
Expand Down Expand Up @@ -193,7 +221,7 @@ private async ValueTask HandleCheckpointingPhasesAsync(FasterExecutionContext ct
if (async && !IsIndexFuzzyCheckpointCompleted())
{
clientSession?.UnsafeSuspendThread();
await IsIndexFuzzyCheckpointCompletedAsync();
await IsIndexFuzzyCheckpointCompletedAsync(token);
clientSession?.UnsafeResumeThread();
}
GlobalMoveToNextCheckpointState(currentState);
Expand Down Expand Up @@ -300,7 +328,7 @@ private async ValueTask HandleCheckpointingPhasesAsync(FasterExecutionContext ct
{
Debug.Assert(_hybridLogCheckpoint.flushedSemaphore != null);
clientSession?.UnsafeSuspendThread();
await _hybridLogCheckpoint.flushedSemaphore.WaitAsync();
await _hybridLogCheckpoint.flushedSemaphore.WaitAsync(token);
clientSession?.UnsafeResumeThread();

_hybridLogCheckpoint.flushedSemaphore.Release();
Expand All @@ -315,7 +343,7 @@ private async ValueTask HandleCheckpointingPhasesAsync(FasterExecutionContext ct
if (async && !notify)
{
clientSession?.UnsafeSuspendThread();
await IsIndexFuzzyCheckpointCompletedAsync();
await IsIndexFuzzyCheckpointCompletedAsync(token);
clientSession?.UnsafeResumeThread();

notify = true;
Expand Down
113 changes: 113 additions & 0 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,37 @@ internal async ValueTask CompleteIOPendingRequestsAsync(FasterExecutionContext o
InternalContinuePendingRequestAndCallback(opCtx, currentCtx, request);
clientSession.UnsafeSuspendThread();
}
}
}

internal async ValueTask<(Status, Output)> CompleteIOPendingReadRequestsAsync(long serialNo, FasterExecutionContext opCtx, FasterExecutionContext currentCtx, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession, CancellationToken token = default)
{
(Status, Output) s = default;

while (opCtx.ioPendingRequests.Count > 0)
{
AsyncIOContext<Key, Value> request;

if (opCtx.readyResponses.Count > 0)
{
clientSession.UnsafeResumeThread();
while (opCtx.readyResponses.Count > 0)
{
opCtx.readyResponses.TryDequeue(out request);
s = InternalContinuePendingRequestAndCallback(serialNo, opCtx, currentCtx, request);
}
clientSession.UnsafeSuspendThread();
}
else
{
request = await opCtx.readyResponses.DequeueAsync(token);

clientSession.UnsafeResumeThread();
s = InternalContinuePendingRequestAndCallback(serialNo, opCtx, currentCtx, request);
clientSession.UnsafeSuspendThread();
}
}
return s;
}

internal void InternalRetryRequestAndCallback(
Expand Down Expand Up @@ -416,5 +445,89 @@ internal void InternalContinuePendingRequestAndCallback(
}
}


internal (Status, Output) InternalContinuePendingRequestAndCallback(
long readSerialNo,
FasterExecutionContext opCtx,
FasterExecutionContext currentCtx,
AsyncIOContext<Key, Value> request)
{
bool handleLatches = false;
(Status, Output) s = default;

if (!RelaxedCPR)
{
if ((opCtx.version < currentCtx.version) // Thread has already shifted to (v+1)
||
(currentCtx.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch
{
handleLatches = true;
}
}

if (opCtx.ioPendingRequests.TryGetValue(request.id, out PendingContext pendingContext))
{
ref Key key = ref pendingContext.key.Get();

// Remove from pending dictionary
opCtx.ioPendingRequests.Remove(request.id);

OperationStatus internalStatus;
// Issue the continue command
if (pendingContext.type == OperationType.READ)
{
internalStatus = InternalContinuePendingRead(opCtx, request, ref pendingContext, currentCtx);
}
else
{
internalStatus = InternalContinuePendingRMW(opCtx, request, ref pendingContext, currentCtx); ;
}

request.Dispose();

Status status;
// Handle operation status
if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND)
{
status = (Status)internalStatus;
}
else
{
status = HandleOperationStatus(opCtx, currentCtx, pendingContext, internalStatus);
}

// If done, callback user code
if (status == Status.OK || status == Status.NOTFOUND)
{
if (handleLatches)
ReleaseSharedLatch(key);

if (pendingContext.type == OperationType.READ)
{
functions.ReadCompletionCallback(ref key,
ref pendingContext.input,
ref pendingContext.output,
pendingContext.userContext,
status);
if (pendingContext.serialNum == readSerialNo)
{
s.Item1 = status;
s.Item2 = pendingContext.output;
}
}
else
{
functions.RMWCompletionCallback(ref key,
ref pendingContext.input,
pendingContext.userContext,
status);
}
}
pendingContext.Dispose();
}

return s;
}

}
}
10 changes: 5 additions & 5 deletions cs/src/core/Index/Recovery/IndexCheckpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ internal bool IsIndexFuzzyCheckpointCompleted()
return completed1 && completed2;
}

internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync()
internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync(CancellationToken token = default)
{
await IsMainIndexCheckpointCompletedAsync();
await overflowBucketsAllocator.IsCheckpointCompletedAsync();
await IsMainIndexCheckpointCompletedAsync(token);
await overflowBucketsAllocator.IsCheckpointCompletedAsync(token);
}


Expand Down Expand Up @@ -104,11 +104,11 @@ private bool IsMainIndexCheckpointCompleted()
return mainIndexCheckpointCallbackCount == 0;
}

private async ValueTask IsMainIndexCheckpointCompletedAsync()
private async ValueTask IsMainIndexCheckpointCompletedAsync(CancellationToken token = default)
{
if (mainIndexCheckpointCallbackCount > 0)
{
await mainIndexCheckpointSemaphore.WaitAsync();
await mainIndexCheckpointSemaphore.WaitAsync(token);
mainIndexCheckpointSemaphore.Release();
}
}
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Utilities/AsyncQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void Enqueue(T item)
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
{
for (; ; )
{
Expand Down
Loading

0 comments on commit 255c000

Please sign in to comment.