Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Implement Push iterators #817

Merged
merged 12 commits into from
Apr 20, 2023
93 changes: 30 additions & 63 deletions cs/samples/ReadAddress/VersionedReadApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ static async Task<int> Main(string[] args)
await PopulateStore(store);

const int keyToScan = 42;
ScanStore(store, keyToScan);
var cts = new CancellationTokenSource();
await ScanStoreAsync(store, keyToScan, cts.Token);
IterateKeyVersions(store, keyToScan);

// Clean up
store.Dispose();
Expand All @@ -89,18 +87,14 @@ private static (FasterKV<Key, Value>, IDevice, string) CreateStore()
LogDevice = log,
ObjectLogDevice = new NullDevice(),
ReadCacheSettings = useReadCache ? new ReadCacheSettings() : null,
// Use small-footprint values
// Use small-footprint values to get some on-disk records
PageSizeBits = 12, // (4K pages)
MemorySizeBits = 20 // (1M memory for main log)
};

var store = new FasterKV<Key, Value>(
size: 1L << 20,
logSettings: logSettings,
var store = new FasterKV<Key, Value>(size: 1L << 20, logSettings: logSettings,
checkpointSettings: new CheckpointSettings { CheckpointDir = path },
serializerSettings: null,
comparer: new Key.Comparer()
);
serializerSettings: null, comparer: new Key.Comparer());
return (store, log, path);
}

Expand Down Expand Up @@ -141,71 +135,44 @@ private async static Task PopulateStore(FasterKV<Key, Value> store)
Console.WriteLine("Total time to upsert {0} elements: {1:0.000} secs ({2:0.00} inserts/sec)", numKeys, numSec, numKeys / numSec);
}

private static void ScanStore(FasterKV<Key, Value> store, int keyValue)
struct KeyIteratorFunctions : IScanIteratorFunctions<Key, Value>
{
// Start session with FASTER
using var session = store.For(new Functions()).NewSession<Functions>();
readonly FasterKV<Key, Value> store;
internal bool done;

Console.WriteLine($"Sync scanning records for key {keyValue}");
internal KeyIteratorFunctions(FasterKV<Key, Value> store) => this.store = store;

var output = default(Value);
var input = default(Value);
var key = new Key(keyValue);
ReadOptions readOptions = new() { CopyOptions = ReadCopyOptions.None };
for (int lap = 9; /* tested in loop */; --lap)
{
var status = session.Read(ref key, ref input, ref output, ref readOptions, out var recordMetadata, serialNo: maxLap + 1);
public bool OnStart(long beginAddress, long endAddress) => true;

// This will wait for each retrieved record; not recommended for performance-critical code or when retrieving multiple records unless necessary.
if (status.IsPending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
using (completedOutputs)
{
completedOutputs.Next();
recordMetadata = completedOutputs.Current.RecordMetadata;
status = completedOutputs.Current.Status;
}
}
public bool ConcurrentReader(ref Key key, ref Value value, RecordMetadata recordMetadata, long numberOfRecords)
=> SingleReader(ref key, ref value, recordMetadata, numberOfRecords);

if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output))
break;
public bool SingleReader(ref Key key, ref Value value, RecordMetadata recordMetadata, long numberOfRecords)
{
Debug.Assert(!done, "Expected iteration to be complete");
if (maxLap - numberOfRecords == deleteLap)
Debug.Assert(recordMetadata.RecordInfo.Tombstone, "Expected deleted record");
else
Debug.Assert(!recordMetadata.RecordInfo.Tombstone, "Did not expect deleted record");
var output = recordMetadata.RecordInfo.Tombstone ? "<deleted>" : value.value.ToString();
Console.WriteLine($" {output}; PrevAddress: {recordMetadata.RecordInfo.PreviousAddress}");

readOptions.StartAddress = recordMetadata.RecordInfo.PreviousAddress;
// Check for end of loop
done = recordMetadata.RecordInfo.PreviousAddress < store.Log.BeginAddress;
return true;
}
}

private static async Task ScanStoreAsync(FasterKV<Key, Value> store, int keyValue, CancellationToken cancellationToken)
{
// Start session with FASTER
using var session = store.For(new Functions()).NewSession<Functions>();
public void OnException(Exception exception, long numberOfRecords) { }

Console.WriteLine($"Async scanning records for key {keyValue}");

var input = default(Value);
var key = new Key(keyValue);
RecordMetadata recordMetadata = default;
ReadOptions readOptions = new() { CopyOptions = ReadCopyOptions.None };
for (int lap = 9; /* tested in loop */; --lap)
{
var readAsyncResult = await session.ReadAsync(ref key, ref input, ref readOptions, default, serialNo: maxLap + 1, cancellationToken: cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
var (status, output) = readAsyncResult.Complete(out recordMetadata);
if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output))
break;
readOptions.StartAddress = recordMetadata.RecordInfo.PreviousAddress;
}
public void OnStop(bool completed, long numberOfRecords) { }
}

private static bool ProcessRecord(FasterKV<Key, Value> store, Status status, RecordInfo recordInfo, int lap, ref Value output)
private static void IterateKeyVersions(FasterKV<Key, Value> store, int keyValue)
{
Debug.Assert(status.Found == !recordInfo.Tombstone);
Debug.Assert((lap == deleteLap) == recordInfo.Tombstone);
var value = recordInfo.Tombstone ? "<deleted>" : output.value.ToString();
Console.WriteLine($" {value}; PrevAddress: {recordInfo.PreviousAddress}");

// Check for end of loop
return recordInfo.PreviousAddress >= store.Log.BeginAddress;
KeyIteratorFunctions scanFunctions = new(store);
var key = new Key(keyValue);
Debug.Assert(store.Log.IterateKeyVersions(ref scanFunctions, ref key), "Iteration did not complete successfully");
Debug.Assert(scanFunctions.done, "Iteration did not complete as expected");
}
}
}
89 changes: 50 additions & 39 deletions cs/samples/StoreVarLenTypes/SpanByteSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ namespace StoreVarLenTypes
/// </summary>
public class SpanByteSample
{
// Functions for the push scan iterator.
internal struct ScanFunctions : IScanIteratorFunctions<SpanByte, SpanByte>
{
internal long count;

public bool OnStart(long beginAddress, long endAddress) => true;

public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords)
=> SingleReader(ref key, ref value, recordMetadata, numberOfRecords);

public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords)
=> key.ToByteArray()[0] == count++;

public void OnException(Exception exception, long numberOfRecords) { }

public void OnStop(bool completed, long numberOfRecords) { }
}

public static void Run()
{
// VarLen types do not need an object log
Expand All @@ -40,8 +58,9 @@ public static void Run()
Span<byte> keyMem = stackalloc byte[1000];
Span<byte> valueMem = stackalloc byte[1000];

const int numRecords = 200;
byte i;
for (i = 0; i < 200; i++)
for (i = 0; i < numRecords; i++)
{
var keyLen = r.Next(1, 1000);
var key = keyMem.Slice(0, keyLen);
Expand All @@ -55,60 +74,52 @@ public static void Run()
s.Upsert(key, value);
}

bool success = true;

i = 0;
using (IFasterScanIterator<SpanByte, SpanByte> iterator = store.Log.Scan(store.Log.BeginAddress, store.Log.TailAddress))
{
while (iterator.GetNext(out RecordInfo recordInfo))
{
ref var key = ref iterator.GetKey();
if (key.ToByteArray()[0] != i++)
{
success = false;
break;
}
}
}
ScanFunctions scanFunctions = new();
bool success = store.Log.Scan(ref scanFunctions, store.Log.BeginAddress, store.Log.TailAddress)
&& scanFunctions.count == numRecords;

if (i != 200)
if (!success)
{
success = false;
Console.WriteLine("SpanByteSample: Error on Scan!");
}

r = new Random(100);
for (i = 0; i < 200; i++)
else
{
var keyLen = r.Next(1, 1000);
Span<byte> key = keyMem.Slice(0, keyLen);
key.Fill(i);
r = new Random(100);
for (i = 0; i < numRecords; i++)
{
var keyLen = r.Next(1, 1000);
Span<byte> key = keyMem.Slice(0, keyLen);
key.Fill(i);

var valLen = r.Next(1, 1000);
var valLen = r.Next(1, 1000);

// Option 2: Converting fixed Span<byte> to SpanByte
var status = s.Read(SpanByte.FromFixedSpan(key), out byte[] output, userContext: (byte)valLen);
// Option 2: Converting fixed Span<byte> to SpanByte
var status = s.Read(SpanByte.FromFixedSpan(key), out byte[] output, userContext: (byte)valLen);

var expectedValue = valueMem.Slice(0, valLen);
expectedValue.Fill((byte)valLen);
var expectedValue = valueMem.Slice(0, valLen);
expectedValue.Fill((byte)valLen);

if (status.IsPending)
{
s.CompletePending(true);
}
else
{
if (!status.Found || (!output.SequenceEqual(expectedValue.ToArray())))
if (status.IsPending)
{
success = false;
s.CompletePendingWithOutputs(out var completedOutputs, wait: true);
using (completedOutputs)
{
success = completedOutputs.Next();
if (success)
(status, output) = (completedOutputs.Current.Status, completedOutputs.Current.Output);
}
}
success &= status.Found && output.SequenceEqual(expectedValue.ToArray());
if (!success)
{
Console.WriteLine("SpanByteSample: Error on Read!");
break;
}
}
}

if (success)
Console.WriteLine("SpanByteSample: Success!");
else
Console.WriteLine("Error!");

s.Dispose();
store.Dispose();
Expand Down
Loading