Skip to content

Commit

Permalink
Merging from fasterlog
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Oct 7, 2019
2 parents 089d545 + 5caea66 commit e0f745f
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 364 deletions.
253 changes: 126 additions & 127 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,133 +14,9 @@ public class Program
// Entry length can be between 1 and ((1 << FasterLogSettings.PageSizeBits) - 4)
const int entryLength = 1 << 10;
static readonly byte[] staticEntry = new byte[entryLength];
static readonly ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(10);
static FasterLog log;
static FasterLogScanIterator iter;

static void ReportThread()
{
long lastTime = 0;
long lastValue = log.TailAddress;
long lastIterValue = log.BeginAddress;

Stopwatch sw = new Stopwatch();
sw.Start();

while (true)
{
Thread.Sleep(5000);

var nowTime = sw.ElapsedMilliseconds;
var nowValue = log.TailAddress;

Console.WriteLine("Append Throughput: {0} MB/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), nowValue);
lastValue = nowValue;

if (iter != null)
{
var nowIterValue = iter.CurrentAddress;
Console.WriteLine("Scan Throughput: {0} MB/sec, Iter pos: {1}",
(nowIterValue - lastIterValue) / (1000 * (nowTime - lastTime)), nowIterValue);
lastIterValue = nowIterValue;
}

lastTime = nowTime;
}
}

static void CommitThread()
{
while (true)
{
Thread.Sleep(5);
log.FlushAndCommit(true);

// Async version
// await Task.Delay(5);
// await log.FlushAndCommitAsync();
}
}

static void AppendThread()
{
while (true)
{
// TryAppend - can be used with throttling/back-off
// Accepts byte[] and ReadOnlySpan<byte>
while (!log.TryAppend(staticEntry, out _)) ;

// Synchronous blocking append
// Accepts byte[] and ReadOnlySpan<byte>
// log.Append(entry);

// Batched append - batch must fit on one page
// while (!log.TryAppend(spanBatch, out _)) ;
}
}

static void ScanThread()
{
Random r = new Random();

byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
{
entry[i] = (byte)i;
}

var entrySpan = new Span<byte>(entry);

long lastAddress = 0;
Span<byte> result;
using (iter = log.Scan(log.BeginAddress, long.MaxValue))
{
while (true)
{
while (!iter.GetNext(out result, out int length))
{
iter.WaitAsync().GetAwaiter().GetResult();
}

// Memory pool variant:
// iter.GetNext(pool, out IMemoryOwner<byte> resultMem, out int length))

if (!result.SequenceEqual(entrySpan))
{
if (result.Length != entrySpan.Length)
throw new Exception("Invalid entry found, expected length " + entrySpan.Length + ", actual length " + result.Length);
else
throw new Exception("Invalid entry found at offset " + FindDiff(result, entrySpan));
}

// Re-insert entry with small probability
if (r.Next(100) < 10)
{
log.Append(result);
}

if (iter.CurrentAddress - lastAddress > 500_000_000)
{
log.TruncateUntil(iter.CurrentAddress);
lastAddress = iter.CurrentAddress;
}
}
}
}

private static int FindDiff(Span<byte> b1, Span<byte> b2)
{
for (int i = 0; i < b1.Length; i++)
{
if (b1[i] != b2[i])
{
return i;
}
}
return 0;
}

/// <summary>
/// Main program entry point
/// </summary>
Expand Down Expand Up @@ -200,6 +76,27 @@ static void Main(string[] args)
}
}


static void AppendThread()
{
while (true)
{
// TryAppend - can be used with throttling/back-off
// Accepts byte[] and ReadOnlySpan<byte>
while (!log.TryAppend(staticEntry, out _)) ;

// Synchronous blocking append
// Accepts byte[] and ReadOnlySpan<byte>
// log.Append(entry);

// Batched append - batch must fit on one page
// while (!log.TryAppend(spanBatch, out _)) ;
}
}

/// <summary>
/// Async version of append
/// </summary>
static async Task AppendAsync(int id)
{
bool batched = false;
Expand All @@ -208,7 +105,7 @@ static async Task AppendAsync(int id)

if (!batched)
{
// Unbatched version - append each item with commit
// Single commit version - append each item with commit
// Needs high parallelism (NumParallelTasks) for perf
while (true)
{
Expand All @@ -224,7 +121,7 @@ static async Task AppendAsync(int id)
}
else
{
// Batched version - we append many entries to memory,
// Group-commit version - we append many entries to memory,
// then wait for commit periodically
int count = 0;
while (true)
Expand All @@ -238,13 +135,115 @@ static async Task AppendAsync(int id)
}
}

static void ScanThread()
{
Random r = new Random();

long lastAddress = 0;
byte[] result;
using (iter = log.Scan(log.BeginAddress, long.MaxValue))
{
while (true)
{
while (!iter.GetNext(out result, out int length))
{
iter.WaitAsync().GetAwaiter().GetResult();
}

// Memory pool variant:
// iter.GetNext(pool, out IMemoryOwner<byte> resultMem, out int length))

if (Different(result, staticEntry, out int location))
{
if (result.Length != staticEntry.Length)
throw new Exception("Invalid entry found, expected length " + staticEntry.Length + ", actual length " + result.Length);
else
throw new Exception("Invalid entry found at offset " + location);
}

// Re-insert entry with small probability
if (r.Next(100) < 10)
{
log.Append(result);
}

if (iter.CurrentAddress - lastAddress > 500_000_000)
{
log.TruncateUntil(iter.CurrentAddress);
lastAddress = iter.CurrentAddress;
}
}
}
}

static void ReportThread()
{
long lastTime = 0;
long lastValue = log.TailAddress;
long lastIterValue = log.BeginAddress;

Stopwatch sw = new Stopwatch();
sw.Start();

while (true)
{
Thread.Sleep(5000);

var nowTime = sw.ElapsedMilliseconds;
var nowValue = log.TailAddress;

Console.WriteLine("Append Throughput: {0} MB/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), nowValue);
lastValue = nowValue;

if (iter != null)
{
var nowIterValue = iter.CurrentAddress;
Console.WriteLine("Scan Throughput: {0} MB/sec, Iter pos: {1}",
(nowIterValue - lastIterValue) / (1000 * (nowTime - lastTime)), nowIterValue);
lastIterValue = nowIterValue;
}

lastTime = nowTime;
}
}

static void CommitThread()
{
while (true)
{
Thread.Sleep(5);
log.FlushAndCommit(true);

// Async version
// await Task.Delay(5);
// await log.FlushAndCommitAsync();
}
}

private static bool Different(byte[] b1, byte[] b2, out int location)
{
location = 0;
if (b1.Length != b2.Length) return true;
for (location = 0; location < b1.Length; location++)
{
if (b1[location] != b2[location])
{
return true;
}
}
return false;
}

// For batch append API
static readonly ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(10);

private struct ReadOnlySpanBatch : IReadOnlySpanBatch
{
private readonly int batchSize;
public ReadOnlySpanBatch(int batchSize) => this.batchSize = batchSize;
public ReadOnlySpan<byte> Get(int index) => staticEntry;
public int TotalEntries() => batchSize;
}

}
}
Loading

0 comments on commit e0f745f

Please sign in to comment.