Skip to content

Commit

Permalink
[C#] Async version of RMW that returns status (#339)
Browse files Browse the repository at this point in the history
* Async version of RMW that returns status
* Improved async Read to share code with sync path
* Added and fixed testcases.
* Updated UpsertAsync to similar API and structure
Removed waitForCommit as parameter, users can explicitly call WaitForCommitAsync()
* Rename CompleteRead() and CompleteRMW() to Complete()
* Remove UpsertAsync as Upserts do not go async in RelaxedCPR
* Added serialNo support to async ops
* Users can ask session for current SerialNo and NextSerialNo to help use serialnos if they care about recovery prefixes
* Removed code for DeleteAsync and UpsertAsync - neither can go async in relaxed CPR (default), and strict CPR is expected to use non-async API.
* Fix tests that passed non-monotonic SerialNo

Co-authored-by: TedHartMS <15467143+TedHartMS@users.noreply.github.com>
  • Loading branch information
badrishc and TedHartMS committed Sep 26, 2020
1 parent f35a709 commit 7fa7434
Show file tree
Hide file tree
Showing 17 changed files with 555 additions and 377 deletions.
5 changes: 3 additions & 2 deletions cs/samples/StoreAsyncApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ static async Task AsyncOperator(int id)

var key = new CacheKey(rand.Next());
var value = new CacheValue(rand.Next());
await session.UpsertAsync(ref key, ref value, context, true);
session.Upsert(ref key, ref value, context);
await session.WaitForCommitAsync();

Interlocked.Increment(ref numOps);
}
Expand All @@ -103,7 +104,7 @@ static async Task AsyncOperator(int id)

var key = new CacheKey(rand.Next());
var value = new CacheValue(rand.Next());
await session.UpsertAsync(ref key, ref value, context);
session.Upsert(ref key, ref value, context);

if (count++ % 100 == 0)
{
Expand Down
9 changes: 3 additions & 6 deletions cs/samples/StoreDiskReadBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ static async Task AsyncUpsertOperator(int id)
{
key = new Key(i);
value = new Value(i);
if (useAsync)
await session.UpsertAsync(ref key, ref value);
else
session.Upsert(ref key, ref value, Empty.Default, 0);
session.Upsert(ref key, ref value, Empty.Default, 0);
Interlocked.Increment(ref numOps);

if (periodicCommit && i % 100 == 0)
Expand Down Expand Up @@ -135,7 +132,7 @@ static async Task AsyncReadOperator(int id)
}
else
{
var result = (await session.ReadAsync(ref key, ref input)).CompleteRead();
var result = (await session.ReadAsync(ref key, ref input)).Complete();
if (result.Item1 != Status.OK || result.Item2.value.vfield1 != key.key)
{
throw new Exception("Wrong value found");
Expand Down Expand Up @@ -178,7 +175,7 @@ static async Task AsyncReadOperator(int id)
{
for (int j = 0; j < readBatchSize; j++)
{
var result = (await tasks[j].Item2).CompleteRead();
var result = (await tasks[j].Item2).Complete();
if (result.Item1 != Status.OK || result.Item2.value.vfield1 != tasks[j].Item1)
{
throw new Exception($"Wrong value found. Found: {result.Item2.value.vfield1}, Expected: {tasks[j].Item1}");
Expand Down
154 changes: 36 additions & 118 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ internal ClientSession(
/// </summary>
public string ID { get { return ctx.guid; } }

/// <summary>
/// Next sequential serial no for session (current serial no + 1)
/// </summary>
public long NextSerialNo => ctx.serialNum + 1;

/// <summary>
/// Current serial no for session
/// </summary>
public long SerialNo => ctx.serialNum;

/// <summary>
/// Dispose session
/// </summary>
Expand All @@ -96,7 +106,7 @@ public void Dispose()
/// <param name="serialNo"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext, long serialNo)
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
{
if (SupportAsync) UnsafeResumeThread();
try
Expand Down Expand Up @@ -139,79 +149,46 @@ public Status Read(ref Key key, ref Output output, Context userContext = default
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="context"></param>
/// <param name="serialNo"></param>
/// <param name="token"></param>
/// <returns>ReadAsyncResult - call CompleteRead on the return value to complete the read operation</returns>
/// <returns>ReadAsyncResult - call Complete() on the return value to complete the read operation</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context, Functions>> ReadAsync(ref Key key, ref Input input, Context context = default, CancellationToken token = default)
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context, Functions>> ReadAsync(ref Key key, ref Input input, Context context = default, long serialNo = 0, CancellationToken token = default)
{
return fht.ReadAsync(this, ref key, ref input, context, token);
return fht.ReadAsync(this, ref key, ref input, context, serialNo, token);
}

/// <summary>
/// Upsert operation
/// Async read operation, may return uncommitted result
/// To ensure reading of committed result, complete the read and then call WaitForCommitAsync.
/// </summary>
/// <param name="key"></param>
/// <param name="desiredValue"></param>
/// <param name="userContext"></param>
/// <param name="context"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
/// <param name="token"></param>
/// <returns>ReadAsyncResult - call Complete() on the return value to complete the read operation</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext = default, long serialNo = 0)
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context, Functions>> ReadAsync(ref Key key, Context context = default, long serialNo = 0, CancellationToken token = default)
{
if (SupportAsync) UnsafeResumeThread();
try
{
return fht.ContextUpsert(ref key, ref desiredValue, userContext, FasterSession, serialNo, ctx);
}
finally
{
if (SupportAsync) UnsafeSuspendThread();
}
Input input = default;
return fht.ReadAsync(this, ref key, ref input, context, serialNo, token);
}

/// <summary>
/// Upsert operation
/// </summary>
/// <param name="key"></param>
/// <param name="desiredValue"></param>
/// <param name="context"></param>
/// <param name="waitForCommit"></param>
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask UpsertAsync(ref Key key, ref Value desiredValue, Context context = default, bool waitForCommit = false, CancellationToken token = default)
{
var status = Upsert(ref key, ref desiredValue, context, ctx.serialNum + 1);

if (status == Status.OK && !waitForCommit)
return default;

return SlowUpsertAsync(this, waitForCommit, status, token);
}

private static async ValueTask SlowUpsertAsync(ClientSession<Key, Value, Input, Output, Context, Functions> @this, bool waitForCommit, Status status, CancellationToken token)
{
if (status == Status.PENDING)
await @this.CompletePendingAsync(waitForCommit, token);
else if (waitForCommit)
await @this.WaitForCommitAsync(token);
}

/// <summary>
/// RMW operation
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="userContext"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status RMW(ref Key key, ref Input input, Context userContext, long serialNo)
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext = default, long serialNo = 0)
{
if (SupportAsync) UnsafeResumeThread();
try
{
return fht.ContextRMW(ref key, ref input, userContext, FasterSession, serialNo, ctx);
return fht.ContextUpsert(ref key, ref desiredValue, userContext, FasterSession, serialNo, ctx);
}
finally
{
Expand All @@ -224,14 +201,16 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long serial
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="userContext"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status RMW(ref Key key, ref Input input)
public Status RMW(ref Key key, ref Input input, Context userContext = default, long serialNo = 0)
{
if (SupportAsync) UnsafeResumeThread();
try
{
return fht.ContextRMW(ref key, ref input, default, FasterSession, 0, ctx);
return fht.ContextRMW(ref key, ref input, userContext, FasterSession, serialNo, ctx);
}
finally
{
Expand All @@ -246,27 +225,13 @@ public Status RMW(ref Key key, ref Input input)
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="context"></param>
/// <param name="waitForCommit"></param>
/// <param name="serialNo"></param>
/// <param name="token"></param>
/// <returns></returns>
/// <returns>ValueTask for RMW result, user needs to await and then call Complete() on the result</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask RMWAsync(ref Key key, ref Input input, Context context = default, bool waitForCommit = false, CancellationToken token = default)
public ValueTask<FasterKV<Key, Value>.RmwAsyncResult<Input, Output, Context, Functions>> RMWAsync(ref Key key, ref Input input, Context context = default, long serialNo = 0, CancellationToken token = default)
{
var status = RMW(ref key, ref input, context, ctx.serialNum + 1);

if (status == Status.OK && !waitForCommit)
return default;

return SlowRMWAsync(this, waitForCommit, status, token);
}

private static async ValueTask SlowRMWAsync(ClientSession<Key, Value, Input, Output, Context, Functions> @this, bool waitForCommit, Status status, CancellationToken token)
{

if (status == Status.PENDING)
await @this.CompletePendingAsync(waitForCommit, token);
else if (waitForCommit)
await @this.WaitForCommitAsync(token);
return fht.RmwAsync(this, ref key, ref input, context, serialNo, token);
}

/// <summary>
Expand All @@ -277,7 +242,7 @@ private static async ValueTask SlowRMWAsync(ClientSession<Key, Value, Input, Out
/// <param name="serialNo"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Delete(ref Key key, Context userContext, long serialNo)
public Status Delete(ref Key key, Context userContext = default, long serialNo = 0)
{
if (SupportAsync) UnsafeResumeThread();
try
Expand All @@ -290,53 +255,6 @@ public Status Delete(ref Key key, Context userContext, long serialNo)
}
}

/// <summary>
/// Delete operation
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Delete(ref Key key)
{
if (SupportAsync) UnsafeResumeThread();
try
{
return fht.ContextDelete(ref key, default, FasterSession, 0, ctx);
}
finally
{
if (SupportAsync) UnsafeSuspendThread();
}
}

/// <summary>
/// Async delete operation
/// </summary>
/// <param name="key"></param>
/// <param name="waitForCommit"></param>
/// <param name="context"></param>
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask DeleteAsync(ref Key key, Context context = default, bool waitForCommit = false, CancellationToken token = default)
{
var status = Delete(ref key, context, ctx.serialNum + 1);

if (status == Status.OK && !waitForCommit)
return default;

return SlowDeleteAsync(this, waitForCommit, status, token);
}

private static async ValueTask SlowDeleteAsync(ClientSession<Key, Value, Input, Output, Context, Functions> @this, bool waitForCommit, Status status, CancellationToken token)
{

if (status == Status.PENDING)
await @this.CompletePendingAsync(waitForCommit, token);
else if (waitForCommit)
await @this.WaitForCommitAsync(token);
}

/// <summary>
/// Experimental feature
/// Checks whether specified record is present in memory
Expand Down Expand Up @@ -539,12 +457,12 @@ internal ReadAsyncResult(
/// Complete the read operation, after any I/O is completed.
/// </summary>
/// <returns>The read result, or throws an exception if error encountered.</returns>
public (Status, Output) CompleteRead()
public (Status, Output) Complete()
{
if (status != Status.PENDING)
return (status, output);

return readAsyncInternal.CompleteRead();
return readAsyncInternal.Complete();
}
}

Expand Down
Loading

0 comments on commit 7fa7434

Please sign in to comment.