Skip to content

Commit

Permalink
Major API cleanup and porting to session-based interface
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Dec 9, 2019
1 parent b3cccd0 commit 1eca340
Show file tree
Hide file tree
Showing 68 changed files with 1,347 additions and 1,537 deletions.
12 changes: 6 additions & 6 deletions cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ private void RunYcsb(int thread_idx)
sw.Start();


Value value = default(Value);
Input input = default(Input);
Output output = default(Output);
Value value = default;
Input input = default;
Output output = default;

long reads_done = 0;
long writes_done = 0;
Expand All @@ -115,7 +115,7 @@ private void RunYcsb(int thread_idx)
int count = 0;
#endif

var session = store.StartClientSession(false);
var session = store.NewSession(null, false);

while (!done)
{
Expand Down Expand Up @@ -317,7 +317,7 @@ private void SetupYcsb(int thread_idx)
else
Native32.AffinitizeThreadShardedNuma((uint)thread_idx, 2); // assuming two NUMA sockets

var session = store.StartClientSession(false);
var session = store.NewSession(null, false);

#if DASHBOARD
var tstart = Stopwatch.GetTimestamp();
Expand All @@ -326,7 +326,7 @@ private void SetupYcsb(int thread_idx)
int count = 0;
#endif

Value value = default(Value);
Value value = default;

for (long chunk_idx = Interlocked.Add(ref idx_, kChunkSize) - kChunkSize;
chunk_idx < kInitCount;
Expand Down
2 changes: 1 addition & 1 deletion cs/benchmark/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void DeleteCompletionCallback(ref Key key, Empty ctx)
{
}

public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint)
{
Debug.WriteLine("Session {0} reports persistence until {1}", sessionId, commitPoint.UntilSerialNo);
}
Expand Down
36 changes: 16 additions & 20 deletions cs/playground/ClassCache/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Program
static readonly bool useReadCache = true;
const int max = 1000000;

static void Main(string[] args)
static void Main()
{
// This sample shows the use of FASTER as a cache + key-value store for
// C# objects.
Expand All @@ -45,26 +45,22 @@ static void Main(string[] args)
);

// Thread starts session with FASTER
h.StartSession();
var s = h.NewSession();

Console.WriteLine("Writing keys from 0 to {0} to FASTER", max);

Stopwatch sw = new Stopwatch();
sw.Start();
for (int i = 0; i < max; i++)
{
if (i % 256 == 0)
if (i % (1<<19) == 0)
{
h.Refresh();
if (i % (1<<19) == 0)
{
long workingSet = Process.GetCurrentProcess().WorkingSet64;
Console.WriteLine($"{i}: {workingSet / 1048576}M");
}
long workingSet = Process.GetCurrentProcess().WorkingSet64;
Console.WriteLine($"{i}: {workingSet / 1048576}M");
}
var key = new CacheKey(i);
var value = new CacheValue(i);
h.Upsert(ref key, ref value, context, 0);
s.Upsert(ref key, ref value, context, 0);
}
sw.Stop();
Console.WriteLine("Total time to upsert {0} elements: {1:0.000} secs ({2:0.00} inserts/sec)", max, sw.ElapsedMilliseconds/1000.0, max / (sw.ElapsedMilliseconds / 1000.0));
Expand All @@ -86,12 +82,12 @@ static void Main(string[] args)
var workload = int.Parse(Console.ReadLine());

if (workload == 0)
RandomReadWorkload(h, max);
RandomReadWorkload(s, max);
else
InteractiveReadWorkload(h, max);
InteractiveReadWorkload(s);

// Stop session and clean up
h.StopSession();
s.Dispose();
h.Dispose();
log.Close();
objlog.Close();
Expand All @@ -100,7 +96,7 @@ static void Main(string[] args)
Console.ReadLine();
}

private static void RandomReadWorkload(FasterKV<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions> h, int max)
private static void RandomReadWorkload(ClientSession<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions> s, int max)
{
Console.WriteLine("Issuing uniform random read workload of {0} reads", max);

Expand All @@ -118,14 +114,14 @@ private static void RandomReadWorkload(FasterKV<CacheKey, CacheValue, CacheInput
long k = rnd.Next(max);

var key = new CacheKey(k);
var status = h.Read(ref key, ref input, ref output, context, 0);
var status = s.Read(ref key, ref input, ref output, context, 0);

switch (status)
{
case Status.PENDING:
statusPending++;
if (statusPending % 1000 == 0)
h.CompletePending(false);
s.CompletePending(false);
break;
case Status.OK:
if (output.value.value != key.key)
Expand All @@ -135,13 +131,13 @@ private static void RandomReadWorkload(FasterKV<CacheKey, CacheValue, CacheInput
throw new Exception("Error!");
}
}
h.CompletePending(true);
s.CompletePending(true);
sw.Stop();
Console.WriteLine("Total time to read {0} elements: {1:0.000} secs ({2:0.00} reads/sec)", max, sw.ElapsedMilliseconds / 1000.0, max / (sw.ElapsedMilliseconds / 1000.0));
Console.WriteLine($"Reads completed with PENDING: {statusPending}");
}

private static void InteractiveReadWorkload(FasterKV<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions> h, int max)
private static void InteractiveReadWorkload(ClientSession<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions> s)
{
Console.WriteLine("Issuing interactive read workload");

Expand All @@ -158,11 +154,11 @@ private static void InteractiveReadWorkload(FasterKV<CacheKey, CacheValue, Cache
var key = new CacheKey(k);

context.ticks = DateTime.Now.Ticks;
var status = h.Read(ref key, ref input, ref output, context, 0);
var status = s.Read(ref key, ref input, ref output, context, 0);
switch (status)
{
case Status.PENDING:
h.CompletePending(true);
s.CompletePending(true);
break;
case Status.OK:
long ticks = DateTime.Now.Ticks - context.ticks;
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/ClassCache/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public bool InPlaceUpdater(ref CacheKey key, ref CacheInput input, ref CacheValu
throw new NotImplementedException();
}

public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint)
{
throw new NotImplementedException();
}
Expand Down
32 changes: 16 additions & 16 deletions cs/playground/ClassCacheMT/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Program
static FasterKV<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions>[] h;
static long totalReads = 0;

static void Main(string[] args)
static void Main()
{
// This sample shows the use of FASTER as a multi-threaded (MT) cache + key-value store for
// C# objects. Number of caches and number of threads can be varied.
Expand Down Expand Up @@ -62,31 +62,28 @@ static void Main(string[] args)
new SerializerSettings<CacheKey, CacheValue> { keySerializer = () => new CacheKeySerializer(), valueSerializer = () => new CacheValueSerializer() },
null
);
h[ht].StartSession();
var session = h[ht].NewSession();
Console.WriteLine("Table {0}:", ht);
Console.WriteLine(" Writing keys from 0 to {0} to FASTER", max);

Stopwatch sw = new Stopwatch();
sw.Start();
for (int i = 0; i < max; i++)
{
if (i % 256 == 0)
if (i % (1 << 19) == 0)
{
h[ht].Refresh();
if (i % (1 << 19) == 0)
{
// long workingSet = Process.GetCurrentProcess().WorkingSet64;
// Console.WriteLine($"{i}: {workingSet / 1048576}M");
}
// long workingSet = Process.GetCurrentProcess().WorkingSet64;
// Console.WriteLine($"{i}: {workingSet / 1048576}M");
}

var key = new CacheKey(i);
var value = new CacheValue(i);
h[ht].Upsert(ref key, ref value, context, 0);
session.Upsert(ref key, ref value, context, 0);
}
sw.Stop();
Console.WriteLine(" Total time to upsert {0} elements: {1:0.000} secs ({2:0.00} inserts/sec)", max, sw.ElapsedMilliseconds / 1000.0, max / (sw.ElapsedMilliseconds / 1000.0));
h[ht].Log.DisposeFromMemory();
h[ht].StopSession();
session.Dispose();
}

ContinuousRandomReadWorkload();
Expand Down Expand Up @@ -136,8 +133,11 @@ private static void RandomReadWorkload(int threadid)
{
Console.WriteLine("Issuing uniform random read workload of {0} reads from thread {1}", max, threadid);

ClientSession<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions>[]
sessions = new ClientSession<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions>[kNumTables];

for (int ht = 0; ht < kNumTables; ht++)
h[ht].StartSession();
sessions[ht] = h[ht].NewSession();

var rnd = new Random(threadid);

Expand All @@ -154,14 +154,14 @@ private static void RandomReadWorkload(int threadid)
if (i > 0 && (i % 256 == 0))
{
for (int htcnt = 0; htcnt < kNumTables; htcnt++)
h[htcnt].CompletePending(false);
sessions[htcnt].CompletePending(false);
Interlocked.Add(ref totalReads, 256);
}
long k = rnd.Next(max);

var ht = h[rnd.Next(kNumTables)];
var hts = sessions[rnd.Next(kNumTables)];
var key = new CacheKey(k);
var status = ht.Read(ref key, ref input, ref output, context, 0);
var status = hts.Read(ref key, ref input, ref output, context, 0);

switch (status)
{
Expand All @@ -183,7 +183,7 @@ private static void RandomReadWorkload(int threadid)
Console.WriteLine($"Reads completed with PENDING: {statusPending}");
for (int ht = 0; ht < kNumTables; ht++)
h[ht].StopSession();
sessions[ht].Dispose();
*/
}
}
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/ClassCacheMT/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public bool InPlaceUpdater(ref CacheKey key, ref CacheInput input, ref CacheValu
throw new NotImplementedException();
}

public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint)
{
throw new NotImplementedException();
}
Expand Down
20 changes: 10 additions & 10 deletions cs/playground/ClassSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ public void ReadCompletionCallback(ref MyKey key, ref MyInput input, ref MyOutpu
public void UpsertCompletionCallback(ref MyKey key, ref MyValue value, MyContext ctx) { }
public void RMWCompletionCallback(ref MyKey key, ref MyInput input, MyContext ctx, Status status) { }
public void DeleteCompletionCallback(ref MyKey key, MyContext ctx) { }
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint) { }
public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint) { }
}

class Program
{
static void Main(string[] args)
static void Main()
{
// This sample uses class key and value types, which are not blittable (i.e., they
// require a pointer to heap objects). Such datatypes include variable length types
Expand All @@ -116,21 +116,21 @@ static void Main(string[] args)
var context = default(MyContext);

// Each thread calls StartSession to register itself with FASTER
h.StartSession();
var s = h.NewSession();

for (int i = 0; i < 20000; i++)
{
var _key = new MyKey { key = i };
var value = new MyValue { value = i };
h.Upsert(ref _key, ref value, context, 0);
s.Upsert(ref _key, ref value, context, 0);

// Each thread calls Refresh periodically for thread coordination
if (i % 1024 == 0) h.Refresh();
if (i % 1024 == 0) s.Refresh();
}
var key = new MyKey { key = 23 };
var input = default(MyInput);
MyOutput g1 = new MyOutput();
var status = h.Read(ref key, ref input, ref g1, context, 0);
var status = s.Read(ref key, ref input, ref g1, context, 0);

if (status == Status.OK && g1.value.value == key.key)
Console.WriteLine("Success!");
Expand All @@ -139,7 +139,7 @@ static void Main(string[] args)

MyOutput g2 = new MyOutput();
key = new MyKey { key = 46 };
status = h.Read(ref key, ref input, ref g2, context, 0);
status = s.Read(ref key, ref input, ref g2, context, 0);

if (status == Status.OK && g2.value.value == key.key)
Console.WriteLine("Success!");
Expand All @@ -148,15 +148,15 @@ static void Main(string[] args)

/// Delete key, read to verify deletion
var output = new MyOutput();
h.Delete(ref key, context, 0);
status = h.Read(ref key, ref input, ref output, context, 0);
s.Delete(ref key, context, 0);
status = s.Read(ref key, ref input, ref output, context, 0);
if (status == Status.NOTFOUND)
Console.WriteLine("Success!");
else
Console.WriteLine("Error!");

// Each thread ends session when done
h.StopSession();
s.Dispose();

// Dispose FASTER instance and log
h.Dispose();
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/FixedLenStructSample/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace FixedLenStructSample
/// </summary>
public class FixedLenFunctions : IFunctions<FixedLenKey, FixedLenValue, string, string, Empty>
{
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint)
{
}

Expand Down
11 changes: 6 additions & 5 deletions cs/playground/FixedLenStructSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ static void Main()
(128, new FixedLenFunctions(),
new LogSettings { LogDevice = log, MemorySizeBits = 17, PageSizeBits = 12 }
);
fht.StartSession();

var session = fht.NewSession();

var key = new FixedLenKey("foo");
var value = new FixedLenValue("bar");

var status = fht.Upsert(ref key, ref value, Empty.Default, 0);
var status = session.Upsert(ref key, ref value, Empty.Default, 0);

if (status != Status.OK)
Console.WriteLine("FixedLenStructSample: Error!");
Expand All @@ -30,13 +31,13 @@ static void Main()
var output = default(string);

key = new FixedLenKey("xyz");
status = fht.Read(ref key, ref input, ref output, Empty.Default, 0);
status = session.Read(ref key, ref input, ref output, Empty.Default, 0);

if (status != Status.NOTFOUND)
Console.WriteLine("FixedLenStructSample: Error!");

key = new FixedLenKey("foo");
status = fht.Read(ref key, ref input, ref output, Empty.Default, 0);
status = session.Read(ref key, ref input, ref output, Empty.Default, 0);

if (status != Status.OK)
Console.WriteLine("FixedLenStructSample: Error!");
Expand All @@ -46,7 +47,7 @@ static void Main()
else
Console.WriteLine("FixedLenStructSample: Error!");

fht.StopSession();
session.Dispose();
fht.Dispose();
log.Close();

Expand Down
1 change: 1 addition & 0 deletions cs/playground/PeriodicCompaction/PeriodicCompaction.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<TargetFramework>net46</TargetFramework>
<Platforms>x64</Platforms>
<RuntimeIdentifier>win7-x64</RuntimeIdentifier>
<LangVersion>preview</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 1eca340

Please sign in to comment.