Skip to content

Commit

Permalink
Merge branch 'master' into async-support
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Nov 20, 2019
2 parents ca15128 + ff51d3c commit 0f65fbb
Show file tree
Hide file tree
Showing 8 changed files with 479 additions and 356 deletions.
147 changes: 71 additions & 76 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;
Expand All @@ -23,53 +24,67 @@ public class Program
static void Main()
{
bool sync = true;
var device = Devices.CreateLogDevice("D:\\logs\\hlog.log");
log = new FasterLog(new FasterLogSettings { LogDevice = device });

// Populate entry being inserted
for (int i = 0; i < entryLength; i++)
{
staticEntry[i] = (byte)i;
}

if (sync)
{
// Log writer thread: create as many as needed
new Thread(new ThreadStart(LogWriterThread)).Start();
IDevice device;

// Threads for scan, reporting, commit
new Thread(new ThreadStart(ScanThread)).Start();
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();
}
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
device = Devices.CreateLogDevice("D:\\logs\\hlog.log");
else
{
// Async version of demo: expect lower performance
// particularly for small payload sizes
device = Devices.CreateLogDevice("/mnt/tmp/hlog/hlog.log");

const int NumParallelTasks = 10_000;
ThreadPool.SetMinThreads(2 * Environment.ProcessorCount, 2 * Environment.ProcessorCount);
TaskScheduler.UnobservedTaskException += (object sender, UnobservedTaskExceptionEventArgs e) =>
{
Console.WriteLine($"Unobserved task exception: {e.Exception}");
e.SetObserved();
};
log = new FasterLog(new FasterLogSettings { LogDevice = device });

Task[] tasks = new Task[NumParallelTasks];
for (int i = 0; i < NumParallelTasks; i++)
using (iter = log.Scan(log.BeginAddress, long.MaxValue))
{
if (sync)
{
int local = i;
tasks[i] = Task.Run(() => AsyncLogWriter(local));
// Log writer thread: create as many as needed
new Thread(new ThreadStart(LogWriterThread)).Start();

// Threads for iterator scan: create as many as needed
new Thread(() => ScanThread()).Start();

// Threads for reporting, commit
new Thread(new ThreadStart(ReportThread)).Start();
var t = new Thread(new ThreadStart(CommitThread));
t.Start();
t.Join();
}
else
{
// Async version of demo: expect lower performance
// particularly for small payload sizes

var scan = Task.Run(() => AsyncScan());
const int NumParallelTasks = 10_000;
ThreadPool.SetMinThreads(2 * Environment.ProcessorCount, 2 * Environment.ProcessorCount);
TaskScheduler.UnobservedTaskException += (object sender, UnobservedTaskExceptionEventArgs e) =>
{
Console.WriteLine($"Unobserved task exception: {e.Exception}");
e.SetObserved();
};

Task[] tasks = new Task[NumParallelTasks];
for (int i = 0; i < NumParallelTasks; i++)
{
int local = i;
tasks[i] = Task.Run(() => AsyncLogWriter(local));
}

// Threads for reporting, commit
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();
var scan = Task.Run(() => AsyncScan());

Task.WaitAll(tasks);
Task.WaitAll(scan);
// Threads for reporting, commit
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();

Task.WaitAll(tasks);
Task.WaitAll(scan);
}
}
}

Expand Down Expand Up @@ -138,41 +153,31 @@ static async Task AsyncLogWriter(int id)

static void ScanThread()
{
Random r = new Random();
byte[] result;

using (iter = log.Scan(log.BeginAddress, long.MaxValue))
while (true)
{
while (true)
while (!iter.GetNext(out result, out _, out _))
{
while (!iter.GetNext(out result, out int length))
{
// For finite end address, check if iteration ended
// if (iter.CurrentAddress >= endAddress) return;
iter.WaitAsync().GetAwaiter().GetResult();
}
// For finite end address, check if iteration ended
// if (iter.CurrentAddress >= endAddress) return;
iter.WaitAsync().GetAwaiter().GetResult();
}

// Memory pool variant:
// iter.GetNext(pool, out IMemoryOwner<byte> resultMem, out int length))
// Memory pool variant:
// iter.GetNext(pool, out IMemoryOwner<byte> resultMem, out int length, out long currentAddress)

if (Different(result, staticEntry, out int location))
throw new Exception("Invalid entry found");
if (Different(result, staticEntry))
throw new Exception("Invalid entry found");

// Re-insert entry with small probability
if (r.Next(100) < 10)
{
log.Enqueue(result);
}

// Example of random read from given address
// (result, _) = log.ReadAsync(iter.CurrentAddress).GetAwaiter().GetResult();
// Example of random read from given address
// (result, _) = log.ReadAsync(iter.CurrentAddress).GetAwaiter().GetResult();

// Truncate log until after recently processed entry
log.TruncateUntil(iter.NextAddress);
// Truncate until start of most recently read page
log.TruncateUntilPageStart(iter.NextAddress);

// Safer truncate variant: truncate until start of page
// log.TruncateUntilPageStart(iter.NextAddress);
}
// Truncate log until after most recently read entry
// log.TruncateUntil(iter.NextAddress);
}

// Example of recoverable (named) iterator:
Expand All @@ -181,13 +186,12 @@ static void ScanThread()

static async Task AsyncScan()
{
using (iter = log.Scan(log.BeginAddress, long.MaxValue))
await foreach ((byte[] result, int length) in iter.GetAsyncEnumerable())
{
if (Different(result, staticEntry, out int location))
throw new Exception("Invalid entry found");
log.TruncateUntil(iter.NextAddress);
}
await foreach ((byte[] result, int length) in iter.GetAsyncEnumerable())
{
if (Different(result, staticEntry))
throw new Exception("Invalid entry found");
log.TruncateUntilPageStart(iter.NextAddress);
}
}

static void ReportThread()
Expand All @@ -212,7 +216,7 @@ static void ReportThread()

if (iter != null)
{
var nowIterValue = iter.CurrentAddress;
var nowIterValue = iter.NextAddress;
Console.WriteLine("Scan Throughput: {0} MB/sec, Iter pos: {1}",
(nowIterValue - lastIterValue) / (1000 * (nowTime - lastTime)), nowIterValue);
lastIterValue = nowIterValue;
Expand Down Expand Up @@ -246,18 +250,9 @@ static void CommitThread()
}
}

private static bool Different(byte[] b1, byte[] b2, out int location)
private static bool Different(ReadOnlySpan<byte> b1, ReadOnlySpan<byte> b2)
{
location = 0;
if (b1.Length != b2.Length) return true;
for (location = 0; location < b1.Length; location++)
{
if (b1[location] != b2[location])
{
return true;
}
}
return false;
return !b1.SequenceEqual(b2);
}

private struct ReadOnlySpanBatch : IReadOnlySpanBatch
Expand Down
96 changes: 96 additions & 0 deletions cs/src/core/Device/FixedPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Threading;

namespace FASTER.core
{
class FixedPool<T> : IDisposable where T : IDisposable
{
readonly T[] items;
readonly int[] owners;
readonly int size;
readonly Func<T> creator;
bool disposed = false;

public FixedPool(int size, Func<T> creator)
{
items = new T[size];
owners = new int[size];
this.size = size;
this.creator = creator;
}

public (T, int) Get()
{
while (true)
{
for (int i=0; i<size; i++)
{
if (disposed)
throw new Exception("Disposed");

var val = owners[i];
if (val == 0)
{
if (Interlocked.CompareExchange(ref owners[i], 2, val) == val)
{
items[i] = creator();
return (items[i], i);
}
}
else if (val == 1)
{
if (Interlocked.CompareExchange(ref owners[i], 2, val) == val)
{
return (items[i], i);
}
}
}
}
}

public void Return(int offset)
{
if (!disposed)
Interlocked.CompareExchange(ref owners[offset], 1, 2);
else
{
if (Interlocked.CompareExchange(ref owners[offset], -1, 2) == 2)
items[offset].Dispose();
}
}

public void Dispose()
{
disposed = true;
bool done = false;

while (!done)
{
done = true;

for (int i = 0; i < size; i++)
{
var val = owners[i];
if (val == 0)
{
if (Interlocked.CompareExchange(ref owners[i], -1, val) != val)
done = false;
}
else if (val == 1)
{
done = false;
if (Interlocked.CompareExchange(ref owners[i], -1, val) == val)
items[i].Dispose();
}
else if (val == 2)
{
done = false;
}
}
}
}
}
}
Loading

0 comments on commit 0f65fbb

Please sign in to comment.