Skip to content

Commit

Permalink
User should be able to provide context class when using async methods (
Browse files Browse the repository at this point in the history
  • Loading branch information
ThiagoT1 committed Feb 5, 2020
1 parent 8f9cf2c commit 8f14cf4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 20 deletions.
9 changes: 6 additions & 3 deletions cs/playground/FasterKVAsyncSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static void Main()
Console.WriteLine($"Unobserved task exception: {e.Exception}");
e.SetObserved();
};

Task[] tasks = new Task[NumParallelTasks];
for (int i = 0; i < NumParallelTasks; i++)
{
Expand All @@ -67,6 +67,8 @@ static async Task AsyncOperator(int id)

await Task.Yield();

var context = new CacheContext();

if (!batched)
{
// Single commit version - upsert each item and wait for commit
Expand All @@ -76,7 +78,7 @@ static async Task AsyncOperator(int id)
{
try
{
await session.UpsertAsync(new CacheKey(rand.Next()), new CacheValue(rand.Next()), true);
await session.UpsertAsync(new CacheKey(rand.Next()), new CacheValue(rand.Next()), context, true);
Interlocked.Increment(ref numOps);
}
catch (Exception ex)
Expand All @@ -90,9 +92,10 @@ static async Task AsyncOperator(int id)
// Batched version - we enqueue many entries to memory,
// then wait for commit periodically
int count = 0;

while (true)
{
await session.UpsertAsync(new CacheKey(rand.Next()), new CacheValue(rand.Next()));
await session.UpsertAsync(new CacheKey(rand.Next()), new CacheValue(rand.Next()), context);
if (count++ % 100 == 0)
{
await session.WaitForCommitAsync();
Expand Down
17 changes: 8 additions & 9 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public sealed class ClientSession<Key, Value, Input, Output, Context, Functions>

internal ClientSession(
FasterKV<Key, Value, Input, Output, Context, Functions> fht,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx,
bool supportAsync)
{
this.fht = fht;
Expand Down Expand Up @@ -84,14 +84,14 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="context"></param>
/// <param name="waitForCommit"></param>
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask<(Status, Output)> ReadAsync(Key key, Input input, bool waitForCommit = false, CancellationToken token = default)
public async ValueTask<(Status, Output)> ReadAsync(Key key, Input input, Context context = default, bool waitForCommit = false, CancellationToken token = default)
{
Output output = default;
Context context = default;
var status = Read(ref key, ref input, ref output, context, ctx.serialNum + 1);
if (status == Status.PENDING)
return await CompletePendingReadAsync(ctx.serialNum, waitForCommit, token);
Expand Down Expand Up @@ -122,13 +122,13 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, l
/// </summary>
/// <param name="key"></param>
/// <param name="desiredValue"></param>
/// <param name="context"></param>
/// <param name="waitForCommit"></param>
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask UpsertAsync(Key key, Value desiredValue, bool waitForCommit = false, CancellationToken token = default)
public async ValueTask UpsertAsync(Key key, Value desiredValue, Context context = default, bool waitForCommit = false, CancellationToken token = default)
{
Context context = default;
var status = Upsert(ref key, ref desiredValue, context, ctx.serialNum + 1);
if (status == Status.PENDING)
await CompletePendingAsync(waitForCommit, token);
Expand Down Expand Up @@ -158,13 +158,13 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long serial
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="context"></param>
/// <param name="waitForCommit"></param>
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask RMWAsync(Key key, Input input, bool waitForCommit = false, CancellationToken token = default)
public async ValueTask RMWAsync(Key key, Input input, Context context = default, bool waitForCommit = false, CancellationToken token = default)
{
Context context = default;
var status = RMW(ref key, ref input, context, ctx.serialNum + 1);
if (status == Status.PENDING)
await CompletePendingAsync(waitForCommit, token);
Expand Down Expand Up @@ -196,9 +196,8 @@ public Status Delete(ref Key key, Context userContext, long serialNo)
/// <param name="token"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask DeleteAsync(Key key, bool waitForCommit = false, CancellationToken token = default)
public async ValueTask DeleteAsync(Key key, Context context = default, bool waitForCommit = false, CancellationToken token = default)
{
Context context = default;
var status = Delete(ref key, context, ctx.serialNum + 1);
if (status == Status.PENDING)
await CompletePendingAsync(waitForCommit, token);
Expand Down
16 changes: 8 additions & 8 deletions cs/test/ObjectFASTERTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal class ObjectFASTERTests
{
private FasterKV<MyKey, MyValue, MyInput, MyOutput, Empty, MyFunctions> fht;
private IDevice log, objlog;

[SetUp]
public void Setup()
{
Expand Down Expand Up @@ -170,38 +170,38 @@ public async Task AsyncObjectDiskWriteRead()
{
var key = new MyKey { key = i };
var value = new MyValue { value = i };
await session.UpsertAsync(key, value);
await session.UpsertAsync(key, value, Empty.Default);
}

var key1 = new MyKey { key = 1989 };
var input = new MyInput();
var result = await session.ReadAsync(key1, input);
var result = await session.ReadAsync(key1, input, Empty.Default);
Assert.IsTrue(result.Item1 == Status.OK);
Assert.IsTrue(result.Item2.value.value == 1989);

var key2 = new MyKey { key = 23 };
result = await session.ReadAsync(key2, input);
result = await session.ReadAsync(key2, input, Empty.Default);
Assert.IsTrue(result.Item1 == Status.OK);
Assert.IsTrue(result.Item2.value.value == 23);

var key3 = new MyKey { key = 9999 };
result = await session.ReadAsync(key3, input);
result = await session.ReadAsync(key3, input, Empty.Default);
Assert.IsTrue(result.Item1 == Status.NOTFOUND);

// Update last 100 using RMW in memory
for (int i = 1900; i < 2000; i++)
{
var key = new MyKey { key = i };
input = new MyInput { value = 1 };
await session.RMWAsync(key, input);
await session.RMWAsync(key, input, Empty.Default);
}

// Update first 100 using RMW from storage
for (int i = 0; i < 100; i++)
{
var key = new MyKey { key = i };
input = new MyInput { value = 1 };
await session.RMWAsync(key, input);
await session.RMWAsync(key, input, Empty.Default);
}

for (int i = 0; i < 2000; i++)
Expand All @@ -210,7 +210,7 @@ public async Task AsyncObjectDiskWriteRead()
var key = new MyKey { key = i };
var value = new MyValue { value = i };

result = await session.ReadAsync(key, input);
result = await session.ReadAsync(key, input, Empty.Default);
Assert.IsTrue(result.Item1 == Status.OK);
if (i < 100 || i >= 1900)
Assert.IsTrue(result.Item2.value.value == value.value + 1);
Expand Down

0 comments on commit 8f14cf4

Please sign in to comment.