Skip to content

Commit

Permalink
Ref keys on async methods (#242)
Browse files Browse the repository at this point in the history
* User should be able to provide context class when using async methods

* Async methods could support ref Keys

* Fix typo

* Short cicuit when possible.

* Adequate to static nested SlowMethods
  • Loading branch information
ThiagoT1 committed Feb 13, 2020
1 parent 8f14cf4 commit bd5ad88
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 32 deletions.
14 changes: 11 additions & 3 deletions cs/playground/FasterKVAsyncSample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
Expand Down Expand Up @@ -78,7 +78,11 @@ static async Task AsyncOperator(int id)
{
try
{
await session.UpsertAsync(new CacheKey(rand.Next()), new CacheValue(rand.Next()), context, true);

var key = new CacheKey(rand.Next());
var value = new CacheValue(rand.Next());
await session.UpsertAsync(ref key, ref value, context, true);

Interlocked.Increment(ref numOps);
}
catch (Exception ex)
Expand All @@ -95,7 +99,11 @@ static async Task AsyncOperator(int id)

while (true)
{
await session.UpsertAsync(new CacheKey(rand.Next()), new CacheValue(rand.Next()), context);

var key = new CacheKey(rand.Next());
var value = new CacheValue(rand.Next());
await session.UpsertAsync(ref key, ref value, context);

if (count++ % 100 == 0)
{
await session.WaitForCommitAsync();
Expand Down
104 changes: 82 additions & 22 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

#pragma warning disable 0162
Expand Down Expand Up @@ -89,17 +89,32 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask<(Status, Output)> ReadAsync(Key key, Input input, Context context = default, bool waitForCommit = false, CancellationToken token = default)
public ValueTask<(Status, Output)> ReadAsync(ref Key key, ref Input input, Context context = default, bool waitForCommit = false, CancellationToken token = default)
{
Output output = default;
var status = Read(ref key, ref input, ref output, context, ctx.serialNum + 1);
if (status == Status.PENDING)
return await CompletePendingReadAsync(ctx.serialNum, waitForCommit, token);
else if (waitForCommit)
await WaitForCommitAsync(token);
return (status, output);

if ((status == Status.OK || status == Status.NOTFOUND) && !waitForCommit)
return new ValueTask<(Status, Output)>((status, output));

return SlowReadAsync(this, waitForCommit, output, status, token);

static async ValueTask<(Status, Output)> SlowReadAsync(
ClientSession<Key, Value, Input, Output, Context, Functions> @this,
bool waitForCommit, Output output, Status status, CancellationToken token
)
{

if (status == Status.PENDING)
return await @this.CompletePendingReadAsync(@this.ctx.serialNum, waitForCommit, token);
else if (waitForCommit)
await @this.WaitForCommitAsync(token);
return (status, output);
}
}



/// <summary>
/// Upsert operation
/// </summary>
Expand Down Expand Up @@ -127,15 +142,30 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, l
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask UpsertAsync(Key key, Value desiredValue, Context context = default, bool waitForCommit = false, CancellationToken token = default)
public ValueTask UpsertAsync(ref Key key, ref Value desiredValue, Context context = default, bool waitForCommit = false, CancellationToken token = default)
{
var status = Upsert(ref key, ref desiredValue, context, ctx.serialNum + 1);
if (status == Status.PENDING)
await CompletePendingAsync(waitForCommit, token);
else if (waitForCommit)
await WaitForCommitAsync(token);

if (status == Status.OK && !waitForCommit)
return default;

return SlowUpsertAsync(this, waitForCommit, status, token);

static async ValueTask SlowUpsertAsync(
ClientSession<Key, Value, Input, Output, Context, Functions> @this,
bool waitForCommit, Status status, CancellationToken token
)
{

if (status == Status.PENDING)
await @this.CompletePendingAsync(waitForCommit, token);
else if (waitForCommit)
await @this.WaitForCommitAsync(token);
}
}



/// <summary>
/// RMW operation
/// </summary>
Expand Down Expand Up @@ -163,15 +193,30 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long serial
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask RMWAsync(Key key, Input input, Context context = default, bool waitForCommit = false, CancellationToken token = default)
public ValueTask RMWAsync(ref Key key, ref Input input, Context context = default, bool waitForCommit = false, CancellationToken token = default)
{
var status = RMW(ref key, ref input, context, ctx.serialNum + 1);
if (status == Status.PENDING)
await CompletePendingAsync(waitForCommit, token);
else if (waitForCommit)
await WaitForCommitAsync(token);

if (status == Status.OK && !waitForCommit)
return default;

return SlowRMWAsync(this, waitForCommit, status, token);

static async ValueTask SlowRMWAsync(
ClientSession<Key, Value, Input, Output, Context, Functions> @this,
bool waitForCommit, Status status, CancellationToken token
)
{

if (status == Status.PENDING)
await @this.CompletePendingAsync(waitForCommit, token);
else if (waitForCommit)
await @this.WaitForCommitAsync(token);
}
}



/// <summary>
/// Delete operation
/// </summary>
Expand All @@ -196,15 +241,30 @@ public Status Delete(ref Key key, Context userContext, long serialNo)
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask DeleteAsync(Key key, Context context = default, bool waitForCommit = false, CancellationToken token = default)
public ValueTask DeleteAsync(ref Key key, Context context = default, bool waitForCommit = false, CancellationToken token = default)
{
var status = Delete(ref key, context, ctx.serialNum + 1);
if (status == Status.PENDING)
await CompletePendingAsync(waitForCommit, token);
else if (waitForCommit)
await WaitForCommitAsync(token);

if (status == Status.OK && !waitForCommit)
return default;

return SlowDeleteAsync(this, waitForCommit, status, token);

static async ValueTask SlowDeleteAsync(
ClientSession<Key, Value, Input, Output, Context, Functions> @this,
bool waitForCommit, Status status, CancellationToken token
)
{

if (status == Status.PENDING)
await @this.CompletePendingAsync(waitForCommit, token);
else if (waitForCommit)
await @this.WaitForCommitAsync(token);
}
}



/// <summary>
/// Experimental feature
/// Checks whether specified record is present in memory
Expand Down
17 changes: 10 additions & 7 deletions cs/test/ObjectFASTERTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,38 +170,41 @@ public async Task AsyncObjectDiskWriteRead()
{
var key = new MyKey { key = i };
var value = new MyValue { value = i };
await session.UpsertAsync(key, value, Empty.Default);
await session.UpsertAsync(ref key, ref value, Empty.Default);
}

var key1 = new MyKey { key = 1989 };
var input = new MyInput();
var result = await session.ReadAsync(key1, input, Empty.Default);
var result = await session.ReadAsync(ref key1, ref input, Empty.Default);
Assert.IsTrue(result.Item1 == Status.OK);
Assert.IsTrue(result.Item2.value.value == 1989);

var key2 = new MyKey { key = 23 };
result = await session.ReadAsync(key2, input, Empty.Default);
result = await session.ReadAsync(ref key2, ref input, Empty.Default);

Assert.IsTrue(result.Item1 == Status.OK);
Assert.IsTrue(result.Item2.value.value == 23);

var key3 = new MyKey { key = 9999 };
result = await session.ReadAsync(key3, input, Empty.Default);
result = await session.ReadAsync(ref key3, ref input, Empty.Default);

Assert.IsTrue(result.Item1 == Status.NOTFOUND);

// Update last 100 using RMW in memory
for (int i = 1900; i < 2000; i++)
{
var key = new MyKey { key = i };
input = new MyInput { value = 1 };
await session.RMWAsync(key, input, Empty.Default);
await session.RMWAsync(ref key, ref input, Empty.Default);

}

// Update first 100 using RMW from storage
for (int i = 0; i < 100; i++)
{
var key = new MyKey { key = i };
input = new MyInput { value = 1 };
await session.RMWAsync(key, input, Empty.Default);
await session.RMWAsync(ref key, ref input, Empty.Default);
}

for (int i = 0; i < 2000; i++)
Expand All @@ -210,7 +213,7 @@ public async Task AsyncObjectDiskWriteRead()
var key = new MyKey { key = i };
var value = new MyValue { value = i };

result = await session.ReadAsync(key, input, Empty.Default);
result = await session.ReadAsync(ref key, ref input, Empty.Default);
Assert.IsTrue(result.Item1 == Status.OK);
if (i < 100 || i >= 1900)
Assert.IsTrue(result.Item2.value.value == value.value + 1);
Expand Down

0 comments on commit bd5ad88

Please sign in to comment.