Skip to content

Commit

Permalink
Expand FasterLog Initialize API
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed May 18, 2023
1 parent 3450702 commit d6c3a82
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 51 deletions.
105 changes: 55 additions & 50 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace FASTER.core
public sealed class FasterLog : IDisposable
{
private Exception cannedException = null;

readonly BlittableAllocator<Empty, byte> allocator;
readonly LightEpoch epoch;
readonly ILogCommitManager logCommitManager;
Expand Down Expand Up @@ -230,27 +230,31 @@ public void Reset()
/// Initialize new log instance with specific begin address and (optional) last commit number
/// </summary>
/// <param name="beginAddress"></param>
/// <param name="committedUntilAddress"></param>
/// <param name="lastCommitNum"></param>
public void Initialize(long beginAddress, long lastCommitNum = 0)
public void Initialize(long beginAddress, long committedUntilAddress, long lastCommitNum = 0)
{
Debug.Assert(!readOnlyMode);

if (beginAddress == 0)
beginAddress = allocator.GetFirstValidLogicalAddress(0);

if (committedUntilAddress == 0)
committedUntilAddress = beginAddress;

try
{
allocator.Reset();
allocator.RestoreHybridLog(beginAddress, beginAddress, beginAddress, beginAddress);
allocator.RestoreHybridLog(beginAddress, committedUntilAddress, committedUntilAddress, committedUntilAddress);
}
catch
{
if (!tolerateDeviceFailure) throw;
}

CommittedUntilAddress = beginAddress;
CommittedUntilAddress = committedUntilAddress;
CommittedBeginAddress = beginAddress;
SafeTailAddress = beginAddress;
SafeTailAddress = committedUntilAddress;

commitNum = lastCommitNum;
this.beginAddress = beginAddress;
Expand Down Expand Up @@ -341,7 +345,7 @@ internal void TrueDispose()
if (disposeLogCommitManager)
logCommitManager.Dispose();
}

#region Enqueue
/// <summary>
/// Enqueue entry to log (in memory) - no guarantee of flush/commit
Expand Down Expand Up @@ -381,7 +385,7 @@ public long Enqueue(IReadOnlySpanBatch readOnlySpanBatch)
Thread.Yield();
return logicalAddress;
}

/// <summary>
/// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit
/// </summary>
Expand Down Expand Up @@ -439,9 +443,9 @@ public long Enqueue(IReadOnlySpanBatch readOnlySpanBatch)
if (cannedException != null) throw cannedException;
return false;
}

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
entry.SerializeTo(new Span<byte>((void *) (headerSize + physicalAddress), length));
entry.SerializeTo(new Span<byte>((void*)(headerSize + physicalAddress), length));
SetHeader(length, (byte*)physicalAddress);
if (AutoRefreshSafeTailAddress) DoAutoRefreshSafeTailAddress();
epoch.Suspend();
Expand Down Expand Up @@ -482,10 +486,10 @@ public long Enqueue(IReadOnlySpanBatch readOnlySpanBatch)
}

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
foreach(var entry in entries)
foreach (var entry in entries)
{
var length = entry.SerializedLength;
entry.SerializeTo(new Span<byte>((void *)(headerSize + physicalAddress), length));
entry.SerializeTo(new Span<byte>((void*)(headerSize + physicalAddress), length));
SetHeader(length, (byte*)physicalAddress);
physicalAddress += Align(length) + headerSize;
}
Expand All @@ -494,7 +498,7 @@ public long Enqueue(IReadOnlySpanBatch readOnlySpanBatch)
if (AutoCommit) Commit();
return true;
}

/// <summary>
/// Try to enqueue entry to log (in memory). If it returns true, we are
/// done. If it returns false, we need to retry.
Expand All @@ -515,13 +519,13 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
if (logicalAddress == 0)
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
return false;
}
if (logicalAddress == 0)
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
return false;
}

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
fixed (byte* bp = entry)
Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), length, length);
Expand Down Expand Up @@ -557,7 +561,7 @@ public unsafe bool TryEnqueue(ReadOnlySpan<byte> entry, out long logicalAddress)
if (cannedException != null) throw cannedException;
return false;
}

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
fixed (byte* bp = &entry.GetPinnableReference())
Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), length, length);
Expand Down Expand Up @@ -968,7 +972,7 @@ private static async ValueTask<long> SlowEnqueueAsync(FasterLog @this, IReadOnly

return logicalAddress;
}

/// <summary>
/// Enqueue entry to log in memory (async) - completes after entry is
/// appended to memory, NOT committed to storage.
Expand Down Expand Up @@ -1005,7 +1009,7 @@ private static async ValueTask<long> SlowEnqueueAsync<T>(FasterLog @this, T entr

return logicalAddress;
}

/// <summary>
/// Enqueue batch of entries to log in memory (async) - completes after entry is
/// appended to memory, NOT committed to storage.
Expand Down Expand Up @@ -1058,7 +1062,7 @@ public void WaitForCommit(long untilAddress = 0, long commitNum = -1)
{
if (untilAddress == 0) untilAddress = TailAddress;
if (commitNum == -1) commitNum = this.commitNum;

while (commitNum > persistedCommitNum || untilAddress > CommittedUntilAddress)
{
if (cannedException != null) throw cannedException;
Expand Down Expand Up @@ -1104,7 +1108,7 @@ public void Commit(bool spinWait = false)
// Take a lower-bound of the content of this commit in case our request is filtered but we need to spin
var tail = TailAddress;
var lastCommit = commitNum;

var success = CommitInternal(out var actualTail, out var actualCommitNum, true, null, -1, null);
if (!spinWait) return;
if (success)
Expand Down Expand Up @@ -1300,7 +1304,7 @@ public long EnqueueAndWaitForCommit(IReadOnlySpanBatch readOnlySpanBatch)
WaitForCommit(logicalAddress + 1);
return logicalAddress;
}

/// <summary>
/// Append entry to log - spin-waits until entry is committed to storage.
/// Does NOT itself issue flush!
Expand All @@ -1316,7 +1320,7 @@ public long EnqueueAndWaitForCommit(IReadOnlySpanBatch readOnlySpanBatch)
WaitForCommit(logicalAddress + 1);
return logicalAddress;
}

/// <summary>
/// Append entry to log - spin-waits until entry is committed to storage.
/// Does NOT itself issue flush!
Expand Down Expand Up @@ -1492,7 +1496,7 @@ public async ValueTask<long> EnqueueAndWaitForCommitAsync(IReadOnlySpanBatch rea

return logicalAddress;
}

/// <summary>
/// Append entry to log (async) - completes after entry is committed to storage.
/// Does NOT itself issue flush!
Expand Down Expand Up @@ -1754,7 +1758,7 @@ public async ValueTask<int> ReadRecordLengthAsync(long address, CancellationToke
allocator.AsyncReadRecordToMemory(address, headerSize, AsyncGetHeaderOnlyFromDiskCallback, ref ctx);
}
epoch.Suspend();
await ctx.completedRead.WaitAsync(token).ConfigureAwait(false);
await ctx.completedRead.WaitAsync(token).ConfigureAwait(false);
return GetRecordLengthAndFree(ctx.record);
}

Expand Down Expand Up @@ -1829,12 +1833,12 @@ private void CommitCallback(CommitInfo commitInfo)
private unsafe bool TryEnqueueCommitRecord(ref FasterLogRecoveryInfo info)
{
var entryBodySize = info.SerializedSize();

int allocatedLength = headerSize + Align(entryBodySize);
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

var logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
{
Expand All @@ -1860,12 +1864,12 @@ private bool ShouldCommmitMetadata(ref FasterLogRecoveryInfo info)
{
return beginAddress > CommittedBeginAddress || IteratorsChanged(ref info) || info.Cookie != null;
}

private void CommitMetadataOnly(ref FasterLogRecoveryInfo info)
{
var fromAddress = CommittedUntilAddress > info.BeginAddress ? CommittedUntilAddress : info.BeginAddress;
var untilAddress = FlushedUntilAddress > info.BeginAddress ? FlushedUntilAddress : info.BeginAddress;

CommitCallback(new CommitInfo
{
FromAddress = fromAddress,
Expand Down Expand Up @@ -1941,7 +1945,7 @@ private void SerialCommitCallbackWorker(CommitInfo commitInfo)
ongoingCommitRequests.Dequeue();
}
}

// Nothing was committed --- this was probably an auto-flush. Return now without touching any
// commit task tracking.
if (coveredCommits.Count == 0) return;
Expand Down Expand Up @@ -1971,7 +1975,7 @@ private void SerialCommitCallbackWorker(CommitInfo commitInfo)

// We fast-forwarded commits earlier, so write it out if not covered by another commit
if (latestCommit.FastForwardAllowed) WriteCommitMetadata(latestCommit);

// TODO: Can invoke earlier in the case of fast commit
var _commitTcs = commitTcs;
commitTcs = new TaskCompletionSource<LinkedCommitInfo>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand All @@ -1988,7 +1992,7 @@ private bool IteratorsChanged(ref FasterLogRecoveryInfo info)
var _lastPersistedIterators = LastPersistedIterators;
if (_lastPersistedIterators == null)
{
return info.Iterators != null && info.Iterators.Count != 0;
return info.Iterators != null && info.Iterators.Count != 0;
}
if (info.Iterators == null || _lastPersistedIterators.Count != info.Iterators.Count)
return true;
Expand Down Expand Up @@ -2171,15 +2175,15 @@ private void RestoreSpecificCommit(long requestedCommitNum, out Dictionary<strin
}
catch { }
}

// Need to potentially scan log for the entry
if (scanStart < requestedCommitNum)
{
// If not in fast commit mode, do not scan log
if (!fastCommitMode)
// In the case where precisely requested commit num is not available, can just throw exception
throw new FasterException("requested commit num is not available");

// If no exact metadata is found, scan forward to see if we able to find a commit entry
// Shut up safe guards, I know what I am doing
CommittedUntilAddress = long.MaxValue;
Expand Down Expand Up @@ -2256,7 +2260,8 @@ private async ValueTask<(Dictionary<string, long>, byte[])> RestoreLatestAsync(C
{
using var scanIterator = Scan(info.UntilAddress, long.MaxValue, recover: false);
scanIterator.ScanForwardForCommit(ref info);
} catch { }
}
catch { }
}

// if until address is 0, that means info is still its default value and we haven't been able to recover
Expand All @@ -2272,7 +2277,7 @@ private async ValueTask<(Dictionary<string, long>, byte[])> RestoreLatestAsync(C
allocator.HeadAddress = long.MaxValue;
return (new Dictionary<string, long>(), null);
}

if (!readOnlyMode)
{
var headAddress = info.UntilAddress - allocator.GetOffsetInPage(info.UntilAddress);
Expand All @@ -2281,7 +2286,7 @@ private async ValueTask<(Dictionary<string, long>, byte[])> RestoreLatestAsync(C

if (headAddress == 0)
headAddress = Constants.kFirstValidAddress;
await allocator.RestoreHybridLogAsync(info.BeginAddress, headAddress, info.UntilAddress, info.UntilAddress, cancellationToken : cancellationToken).ConfigureAwait(false);
await allocator.RestoreHybridLogAsync(info.BeginAddress, headAddress, info.UntilAddress, info.UntilAddress, cancellationToken: cancellationToken).ConfigureAwait(false);
}

var iterators = CompleteRestoreFromCommit(info);
Expand Down Expand Up @@ -2472,7 +2477,7 @@ private void AsyncGetHeaderOnlyFromDiskCallback(uint errorCode, uint numBytes, o
Buffer.MemoryCopy(ptr + headerSize, bp, length, length);
}
}

record.Return();
return (result, length);
}
Expand All @@ -2494,22 +2499,22 @@ private int GetRecordLengthAndFree(SectorAlignedMemory record)
record.Return();
return length;
}


private bool CommitInternal(out long commitTail, out long actualCommitNum, bool fastForwardAllowed, byte[] cookie, long proposedCommitNum, Action callback)
{
if (cannedException != null)
throw cannedException;

commitTail = actualCommitNum = 0;

if (readOnlyMode)
throw new FasterException("Cannot commit in read-only mode");

if (fastForwardAllowed && (cookie != null || proposedCommitNum != -1 || callback != null))
throw new FasterException(
"Fast forwarding a commit is only allowed when no cookie, commit num, or callback is specified");

var info = new FasterLogRecoveryInfo
{
FastForwardAllowed = fastForwardAllowed,
Expand All @@ -2535,7 +2540,7 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
// log has been closed, throw an exception
throw new FasterException("log has already been closed");
}

// Make sure we will not be allowed to back out of a commit if AdmitCommit returns true, as the commit policy
// may need to update internal logic for every true response. We might waste some commit nums if commit
// policy filters out a lot of commits, but that's fine.
Expand Down Expand Up @@ -2565,14 +2570,14 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
}

Utility.MonotonicUpdate(ref commitCoveredAddress, commitTail, out _);

commitPolicy.OnCommitCreated(info);
// Enqueue the commit record's content and offset into the queue so it can be picked up by the next flush
// At this point, we expect the commit record to be flushed out as a distinct recovery point
ongoingCommitRequests.Enqueue((commitTail, info));
}


// As an optimization, if a concurrent flush has already advanced FlushedUntilAddress
// past this commit, we can manually trigger a commit callback for safety, and return.
if (commitTail <= FlushedUntilAddress)
Expand Down Expand Up @@ -2643,7 +2648,7 @@ private unsafe void SetHeader(int length, byte* dest)
*(ulong*)dest = Utility.XorBytes(dest + 8, length + 4);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe void SetCommitRecordHeader(int length, byte* dest)
{
Expand Down
2 changes: 1 addition & 1 deletion cs/test/FasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public async ValueTask FasterLogTest2([Values] LogChecksumType logChecksum)
{ LogDevice = device, LogChecksum = logChecksum, LogCommitManager = manager, TryRecoverLatest = false };
log = IsAsync(iteratorType) ? await FasterLog.CreateAsync(logSettings) : new FasterLog(logSettings);

log.Initialize(1000000L, 323);
log.Initialize(1000000L, 1000000L, 323);

log.Commit(true);

Expand Down

0 comments on commit d6c3a82

Please sign in to comment.