Skip to content

Commit

Permalink
[C#] Add FasterLog.CreateAsync and RecoverReadOnlyAsync (#384)
Browse files Browse the repository at this point in the history
* First pass on RecoverAsync
* Change to semaphore
* FIx stale tokens; make test names consistent
* add readonly
* Add CancellationToken to RecoverAsync; increase CancellationToken usage in a couple other places
* Add clarifying comments and expected behavior in FasterLogPubSub. Add FasterLog.CreateAsync (which calls RecoverAsync) and RecoverReadOnlyAsync. Replace obsolete DeleteDirectory functions with Directory.Delete(path, true)
* Remove some debugging code
* FasterLogPubSub: catch OperationCanceledException, add using/Dispose()
  • Loading branch information
TedHartMS committed Dec 12, 2020
1 parent fa39966 commit 1a88326
Show file tree
Hide file tree
Showing 16 changed files with 396 additions and 321 deletions.
138 changes: 93 additions & 45 deletions cs/samples/FasterLogPubSub/Program.cs
Expand Up @@ -18,21 +18,38 @@ class Program

static async Task Main()
{
var device = Devices.CreateLogDevice(path + "mylog");
// This is two samples in one, enumerating over the same FasterLog instance that does commits, or over a separate
// FasterLog that opens the log file read-only and continuously catches up with the first intance's commits.
const bool sameInstance = true;
#pragma warning disable CS0162 // Unreachable code detected
if (!sameInstance)
{
// Because the SAME-instance iterator illustrates truncating the log, the SEPARATE-instance may encounter EOF
// issues if it is run after that truncation without cleaning up the directory first.
// In all other cases, the sample should run without needing to clean up the directory.
//if (Directory.Exists(path)) Directory.Delete(path, true);
}

var device = Devices.CreateLogDevice(path + "mylog");
var log = new FasterLog(new FasterLogSettings { LogDevice = device, MemorySizeBits = 11, PageSizeBits = 9, MutableFraction = 0.5, SegmentSizeBits = 9 });

using var cts = new CancellationTokenSource();

var producer = ProducerAsync(log, cts.Token);
var commiter = CommitterAsync(log, cts.Token);
var committer = CommitterAsync(log, cts.Token);

// Consumer on SAME FasterLog instance
var consumer = ConsumerAsync(log, true, cts.Token);
Task consumer;
if (sameInstance)
{
// Consumer on SAME FasterLog instance
consumer = ConsumerAsync(log, true, cts.Token);
}
else
{
// Consumer on SEPARATE read-only FasterLog instance
consumer = SeparateConsumerAsync(cts.Token);
}
#pragma warning restore CS0162 // Unreachable code detected

// Uncomment below to run consumer on SEPARATE read-only FasterLog instance
// var consumer = SeparateConsumerAsync(cts.Token);

Console.CancelKeyPress += (o, eventArgs) =>
{
Console.WriteLine("Cancelling program...");
Expand All @@ -42,86 +59,117 @@ static async Task Main()

await producer;
await consumer;
await commiter;

await committer;
Console.WriteLine("Finished.");

log.Dispose();
device.Dispose();
try { new DirectoryInfo(path).Delete(true); } catch { }
}

static async Task CommitterAsync(FasterLog log, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
try
{
await Task.Delay(TimeSpan.FromMilliseconds(commitPeriodMs), cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMilliseconds(commitPeriodMs), cancellationToken);

Console.WriteLine("Committing...");
Console.WriteLine("Committing...");

await log.CommitAsync();
await log.CommitAsync(cancellationToken);
}
}
catch (OperationCanceledException) { }
Console.WriteLine("Committer complete");
}

static async Task ProducerAsync(FasterLog log, CancellationToken cancellationToken)
{
var i = 0L;
while (!cancellationToken.IsCancellationRequested)
try
{
// Console.WriteLine($"Producing {i}");
var i = 0L;
while (!cancellationToken.IsCancellationRequested)
{
// Console.WriteLine($"Producing {i}");

log.Enqueue(Encoding.UTF8.GetBytes(i.ToString()));
log.RefreshUncommitted();
log.Enqueue(Encoding.UTF8.GetBytes(i.ToString()));
log.RefreshUncommitted();

i++;
i++;

await Task.Delay(TimeSpan.FromMilliseconds(10));
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
}
catch (OperationCanceledException) { }
Console.WriteLine("Producer complete");
}

static async Task ConsumerAsync(FasterLog log, bool scanUncommitted, CancellationToken cancellationToken)
{
using var iter = log.Scan(log.BeginAddress, long.MaxValue, "foo", true, ScanBufferingMode.DoublePageBuffering, scanUncommitted);

int count = 0;
await foreach (var (result, length, currentAddress, nextAddress) in iter.GetAsyncEnumerable(cancellationToken))
try
{
Console.WriteLine($"Consuming {Encoding.UTF8.GetString(result)}");
iter.CompleteUntil(nextAddress);
log.TruncateUntil(nextAddress);

// We simulate temporary slow down of data consumption
// This will cause transient log spill to disk (observe folder on storage)
if (count++ > 1000 && count < 1200)
Thread.Sleep(100);
int count = 0;
await foreach (var (result, length, currentAddress, nextAddress) in iter.GetAsyncEnumerable(cancellationToken))
{
Console.WriteLine($"Same Log Consuming {Encoding.UTF8.GetString(result)}");
iter.CompleteUntil(nextAddress);
log.TruncateUntil(nextAddress);

// Simulate temporary slow down of data consumption
// This will cause transient log spill to disk (observe folder on storage)
if (count++ > 1000 && count < 1200)
Thread.Sleep(100);
}
}
catch (OperationCanceledException) { }
Console.WriteLine("Consumer complete");
}

// This creates a separate FasterLog over the same log file, using RecoverReadOnly to continuously update
// to the primary FasterLog's commits.
static async Task SeparateConsumerAsync(CancellationToken cancellationToken)
{
var device = Devices.CreateLogDevice(path + "mylog");
var log = new FasterLog(new FasterLogSettings { LogDevice = device, ReadOnlyMode = true, PageSizeBits = 9, SegmentSizeBits = 9 });
var _ = RecoverAsync(log, cancellationToken);

using var iter = log.Scan(log.BeginAddress, long.MaxValue);
using var device = Devices.CreateLogDevice(path + "mylog");
using var log = new FasterLog(new FasterLogSettings { LogDevice = device, ReadOnlyMode = true, PageSizeBits = 9, SegmentSizeBits = 9 });

await foreach (var (result, length, currentAddress, nextAddress) in iter.GetAsyncEnumerable(cancellationToken))
try
{
Console.WriteLine($"Consuming {Encoding.UTF8.GetString(result)}");
iter.CompleteUntil(nextAddress);
var _ = BeginRecoverAsyncLoop(log, cancellationToken);

// This enumerator waits asynchronously when we have reached the committed tail of the duplicate FasterLog. When RecoverReadOnly
// reads new data committed by the primary FasterLog, it signals commit completion to let iter continue to the new tail.
using var iter = log.Scan(log.BeginAddress, long.MaxValue);
await foreach (var (result, length, currentAddress, nextAddress) in iter.GetAsyncEnumerable(cancellationToken))
{
Console.WriteLine($"Separate Log Consuming {Encoding.UTF8.GetString(result)}");
iter.CompleteUntil(nextAddress);
}
}
catch (OperationCanceledException) { }
Console.WriteLine("SeparateConsumer complete");
}

static async Task RecoverAsync(FasterLog log, CancellationToken cancellationToken)
static async Task BeginRecoverAsyncLoop(FasterLog log, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
try
{
await Task.Delay(TimeSpan.FromMilliseconds(restorePeriodMs), cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
// Delay for a while before checking again.
await Task.Delay(TimeSpan.FromMilliseconds(restorePeriodMs), cancellationToken);

Console.WriteLine("Restoring ...");
Console.WriteLine("Restoring Separate Log...");

log.RecoverReadOnly();
// Recover to the last commit by the primary FasterLog instance.
await log.RecoverReadOnlyAsync(cancellationToken);
}
}
catch (OperationCanceledException) { }
Console.WriteLine("RecoverAsyncLoop complete");
}

}
}
2 changes: 1 addition & 1 deletion cs/src/core/Device/LocalStorageDevice.cs
Expand Up @@ -200,7 +200,7 @@ private void RecoverFiles()
results.Enqueue(result);
}
catch
{
{
Interlocked.Decrement(ref numPending);
callback(uint.MaxValue, 0, context);
results.Enqueue(result);
Expand Down

0 comments on commit 1a88326

Please sign in to comment.