Skip to content

Commit

Permalink
Speed up async read (#256)
Browse files Browse the repository at this point in the history
* Speed up async read by 2X when operations do not go async, and when operations go async.

Ensure session ops do not leave epochs unprotected.

* Updates

* Update folder for sample
  • Loading branch information
badrishc committed Mar 17, 2020
1 parent 26569b2 commit 7122495
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 228 deletions.
20 changes: 10 additions & 10 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterKVAsyncSample", "play
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ClassRecoveryDurablity", "playground\ClassRecoveryDurablity\ClassRecoveryDurablity.csproj", "{95AC8766-84F9-4E95-B2E9-2169B6375FB2}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterKVDiskReadBenchmark", "playground\FasterKVAsyncStructSample\FasterKVDiskReadBenchmark.csproj", "{993A6898-4B90-4112-8E3F-D43C764953A4}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterKVDiskReadBenchmark", "playground\FasterKVDiskReadBenchmark\FasterKVDiskReadBenchmark.csproj", "{642DCE86-1BAA-4FFF-98BF-0FB9BB11CD49}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -188,14 +188,14 @@ Global
{95AC8766-84F9-4E95-B2E9-2169B6375FB2}.Release|Any CPU.Build.0 = Release|x64
{95AC8766-84F9-4E95-B2E9-2169B6375FB2}.Release|x64.ActiveCfg = Release|x64
{95AC8766-84F9-4E95-B2E9-2169B6375FB2}.Release|x64.Build.0 = Release|x64
{993A6898-4B90-4112-8E3F-D43C764953A4}.Debug|Any CPU.ActiveCfg = Debug|x64
{993A6898-4B90-4112-8E3F-D43C764953A4}.Debug|Any CPU.Build.0 = Debug|x64
{993A6898-4B90-4112-8E3F-D43C764953A4}.Debug|x64.ActiveCfg = Debug|x64
{993A6898-4B90-4112-8E3F-D43C764953A4}.Debug|x64.Build.0 = Debug|x64
{993A6898-4B90-4112-8E3F-D43C764953A4}.Release|Any CPU.ActiveCfg = Release|x64
{993A6898-4B90-4112-8E3F-D43C764953A4}.Release|Any CPU.Build.0 = Release|x64
{993A6898-4B90-4112-8E3F-D43C764953A4}.Release|x64.ActiveCfg = Release|x64
{993A6898-4B90-4112-8E3F-D43C764953A4}.Release|x64.Build.0 = Release|x64
{642DCE86-1BAA-4FFF-98BF-0FB9BB11CD49}.Debug|Any CPU.ActiveCfg = Debug|x64
{642DCE86-1BAA-4FFF-98BF-0FB9BB11CD49}.Debug|Any CPU.Build.0 = Debug|x64
{642DCE86-1BAA-4FFF-98BF-0FB9BB11CD49}.Debug|x64.ActiveCfg = Debug|x64
{642DCE86-1BAA-4FFF-98BF-0FB9BB11CD49}.Debug|x64.Build.0 = Debug|x64
{642DCE86-1BAA-4FFF-98BF-0FB9BB11CD49}.Release|Any CPU.ActiveCfg = Release|x64
{642DCE86-1BAA-4FFF-98BF-0FB9BB11CD49}.Release|Any CPU.Build.0 = Release|x64
{642DCE86-1BAA-4FFF-98BF-0FB9BB11CD49}.Release|x64.ActiveCfg = Release|x64
{642DCE86-1BAA-4FFF-98BF-0FB9BB11CD49}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -219,7 +219,7 @@ Global
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{859F76F4-93D8-4D60-BF9A-363E217FA247} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{95AC8766-84F9-4E95-B2E9-2169B6375FB2} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{993A6898-4B90-4112-8E3F-D43C764953A4} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{642DCE86-1BAA-4FFF-98BF-0FB9BB11CD49} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
131 changes: 64 additions & 67 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,46 +74,30 @@ public void Dispose()
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext, long serialNo)
{
if (SupportAsync) UnsafeResumeThread();
var status = fht.ContextRead(ref key, ref input, ref output, userContext, serialNo, ctx);
if (SupportAsync) UnsafeSuspendThread();
return status;
try
{
return fht.ContextRead(ref key, ref input, ref output, userContext, serialNo, ctx);
}
finally
{
if (SupportAsync) UnsafeSuspendThread();
}
}

/// <summary>
/// Read operation
/// Async read operation, may return uncommitted result
/// To ensure reading of committed result, complete the read and then call WaitForCommitAsync.
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="context"></param>
/// <param name="waitForCommit"></param>
/// <param name="token"></param>
/// <returns></returns>
/// <returns>ReadAsyncResult - call CompleteRead on the return value to complete the read operation</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value, Input, Output, Context, Functions>.ReadAsyncResult> ReadAsync(ref Key key, ref Input input, Context context = default, bool waitForCommit = false, CancellationToken token = default)
public ValueTask<FasterKV<Key, Value, Input, Output, Context, Functions>.ReadAsyncResult> ReadAsync(ref Key key, ref Input input, Context context = default, CancellationToken token = default)
{
var readResult = fht.ReadAsync(this, ref key, ref input, context, token);

if (readResult.IsCompleted && !waitForCommit)
return readResult;

return SlowReadAsync(this, readResult, waitForCommit, token);

static async ValueTask<FasterKV<Key, Value, Input, Output, Context, Functions>.ReadAsyncResult> SlowReadAsync(
ClientSession<Key, Value, Input, Output, Context, Functions> @this,
ValueTask<FasterKV<Key, Value, Input, Output, Context, Functions>.ReadAsyncResult> readResult,
bool waitForCommit, CancellationToken token
)
{
var s = await readResult;

if (waitForCommit)
await @this.WaitForCommitAsync(token);

return s;
}
}


return fht.ReadAsync(this, ref key, ref input, context, token);
}

/// <summary>
/// Upsert operation
Expand All @@ -127,9 +111,14 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, long serialNo)
{
if (SupportAsync) UnsafeResumeThread();
var status = fht.ContextUpsert(ref key, ref desiredValue, userContext, serialNo, ctx);
if (SupportAsync) UnsafeSuspendThread();
return status;
try
{
return fht.ContextUpsert(ref key, ref desiredValue, userContext, serialNo, ctx);
}
finally
{
if (SupportAsync) UnsafeSuspendThread();
}
}

/// <summary>
Expand All @@ -156,7 +145,6 @@ static async ValueTask SlowUpsertAsync(
bool waitForCommit, Status status, CancellationToken token
)
{

if (status == Status.PENDING)
await @this.CompletePendingAsync(waitForCommit, token);
else if (waitForCommit)
Expand All @@ -178,13 +166,19 @@ static async ValueTask SlowUpsertAsync(
public Status RMW(ref Key key, ref Input input, Context userContext, long serialNo)
{
if (SupportAsync) UnsafeResumeThread();
var status = fht.ContextRMW(ref key, ref input, userContext, serialNo, ctx);
if (SupportAsync) UnsafeSuspendThread();
return status;
try
{
return fht.ContextRMW(ref key, ref input, userContext, serialNo, ctx);
}
finally
{
if (SupportAsync) UnsafeSuspendThread();
}
}

/// <summary>
/// RMW operation
/// Async RMW operation
/// Await operation in session before issuing next one
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
Expand Down Expand Up @@ -228,13 +222,18 @@ static async ValueTask SlowRMWAsync(
public Status Delete(ref Key key, Context userContext, long serialNo)
{
if (SupportAsync) UnsafeResumeThread();
var status = fht.ContextDelete(ref key, userContext, serialNo, ctx);
if (SupportAsync) UnsafeSuspendThread();
return status;
try
{
return fht.ContextDelete(ref key, userContext, serialNo, ctx);
}
finally
{
if (SupportAsync) UnsafeSuspendThread();
}
}

/// <summary>
/// Delete operation
/// Async delete operation
/// </summary>
/// <param name="key"></param>
/// <param name="waitForCommit"></param>
Expand Down Expand Up @@ -286,19 +285,12 @@ internal Status ContainsKeyInMemory(ref Key key, long fromAddress = -1)
/// <returns></returns>
public IEnumerable<long> GetPendingRequests()
{
foreach (var val in ctx.prevCtx?.pendingReads)
yield return val.Key;

foreach (var kvp in ctx.prevCtx?.ioPendingRequests)
yield return kvp.Value.serialNum;

foreach (var val in ctx.prevCtx?.retryRequests)
yield return val.serialNum;


foreach (var val in ctx.pendingReads)
yield return val.Key;

foreach (var kvp in ctx.ioPendingRequests)
yield return kvp.Value.serialNum;

Expand Down Expand Up @@ -326,31 +318,36 @@ public void Refresh()
public bool CompletePending(bool spinWait = false, bool spinWaitForCommit = false)
{
if (SupportAsync) UnsafeResumeThread();
var result = fht.InternalCompletePending(ctx, spinWait);
if (spinWaitForCommit)
try
{
if (spinWait != true)
var result = fht.InternalCompletePending(ctx, spinWait);
if (spinWaitForCommit)
{
if (SupportAsync) UnsafeSuspendThread();
throw new FasterException("Can spin-wait for checkpoint completion only if spinWait is true");
}
do
{
fht.InternalCompletePending(ctx, spinWait);
if (fht.InRestPhase())
if (spinWait != true)
{
fht.InternalCompletePending(ctx, spinWait);
if (SupportAsync) UnsafeSuspendThread();
return true;
throw new FasterException("Can spin-wait for checkpoint completion only if spinWait is true");
}
} while (spinWait);
do
{
fht.InternalCompletePending(ctx, spinWait);
if (fht.InRestPhase())
{
fht.InternalCompletePending(ctx, spinWait);
return true;
}
} while (spinWait);
}
return result;
}
finally
{
if (SupportAsync) UnsafeSuspendThread();
}
if (SupportAsync) UnsafeSuspendThread();
return result;
}

/// <summary>
/// Async complete outstanding pending operations
/// Complete all outstanding pending operations asynchronously
/// Async operations (ReadAsync) must be completed individually
/// </summary>
/// <returns></returns>
public async ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default)
Expand All @@ -369,7 +366,7 @@ public async ValueTask CompletePendingAsync(bool waitForCommit = false, Cancella
}

/// <summary>
/// Wait for commit of all operations until current point in session.
/// Wait for commit of all operations completed until the current point in session.
/// Does not itself issue checkpoint/commits.
/// </summary>
/// <returns></returns>
Expand Down
Loading

0 comments on commit 7122495

Please sign in to comment.