Skip to content

Commit

Permalink
Unsafe API to access fasterlog internals (#837)
Browse files Browse the repository at this point in the history
* Unsafe API to access fasterlog internals

* updates
  • Loading branch information
badrishc committed May 25, 2023
1 parent d6c3a82 commit 7d5d5ff
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 6 deletions.
29 changes: 29 additions & 0 deletions cs/src/core/Allocator/ScanIteratorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,35 @@ protected unsafe bool BufferAndLoad(long currentAddress, long currentPage, long
return WaitForFrameLoad(currentAddress, currentFrame);
}

/// <summary>
/// Whether we need to buffer new page from disk
/// </summary>
protected unsafe bool NeedBufferAndLoad(long currentAddress, long currentPage, long currentFrame, long headAddress, long endAddress)
{
for (int i = 0; i < frameSize; i++)
{
var nextPage = currentPage + i;

var pageStartAddress = nextPage << logPageSizeBits;

// Cannot load page if it is entirely in memory or beyond the end address
if (pageStartAddress >= headAddress || pageStartAddress >= endAddress)
continue;

var pageEndAddress = (nextPage + 1) << logPageSizeBits;
if (endAddress < pageEndAddress)
pageEndAddress = endAddress;
if (headAddress < pageEndAddress)
pageEndAddress = headAddress;

var nextFrame = (currentFrame + i) % frameSize;

if (nextLoadedPage[nextFrame] < pageEndAddress || loadedPage[nextFrame] < pageEndAddress)
return true;
}
return false;
}

internal abstract void AsyncReadPagesFromDeviceToFrame<TContext>(long readPageStart, int numPages, long untilAddress, TContext context, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null, CancellationTokenSource cts = null);

private bool WaitForFrameLoad(long currentAddress, long currentFrame)
Expand Down
73 changes: 73 additions & 0 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public sealed class FasterLog : IDisposable
/// </summary>
public byte[] RecoveredCookie;

/// <summary>
/// Header size used by FasterLog
/// </summary>
public int HeaderSize => headerSize;

/// <summary>
/// Task notifying commit completions
/// </summary>
Expand Down Expand Up @@ -373,6 +378,20 @@ public long Enqueue(ReadOnlySpan<byte> entry)
return logicalAddress;
}

/// <summary>
/// Enqueue raw pre-formatted bytes with headers to the log (in memory).
/// </summary>
/// <param name="entryBytes">Raw bytes to be enqueued to log</param>
/// <returns>First logical address of added entries</returns>
public long UnsafeEnqueueRaw(ReadOnlySpan<byte> entryBytes)
{
long logicalAddress;
while (!UnsafeTryEnqueueRaw(entryBytes, out logicalAddress))
Thread.Yield();
return logicalAddress;

}

/// <summary>
/// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit
/// </summary>
Expand Down Expand Up @@ -536,6 +555,44 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)
return true;
}

/// <summary>
/// Try to enqueue raw pre-formatted bytes with headers to the log (in memory). If it returns true, we are
/// done. If it returns false, we need to retry.
/// </summary>
/// <param name="entryBytes">Entry bytes to be enqueued to log</param>
/// <param name="logicalAddress">Logical address of added entry</param>
/// <returns>Whether the append succeeded</returns>
public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan<byte> entryBytes, out long logicalAddress)
{
int length = entryBytes.Length;

// Length should be pre-aligned
Debug.Assert(length == Align(length));
logicalAddress = 0;
int allocatedLength = length;
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
if (logicalAddress == 0)
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
return false;
}

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
entryBytes.CopyTo(new Span<byte>((byte*)physicalAddress, length));
if (AutoRefreshSafeTailAddress) DoAutoRefreshSafeTailAddress();
epoch.Suspend();
if (AutoCommit) Commit();
return true;
}

/// <summary>
/// Try to append entry to log. If it returns true, we are
/// done. If it returns false, we need to retry.
Expand Down Expand Up @@ -2610,6 +2667,22 @@ internal unsafe int GetLength(byte* ptr)
return 0;
}

/// <summary>
/// Get length of entry from pointer to header
/// </summary>
/// <param name="headerPtr"></param>
/// <returns></returns>
public unsafe int UnsafeGetLength(byte* headerPtr)
=> GetLength(headerPtr);

/// <summary>
/// Get aligned version of record length
/// </summary>
/// <param name="length"></param>
/// <returns></returns>
public int UnsafeAlign(int length)
=> Align(length);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal unsafe bool VerifyChecksum(byte* ptr, int length)
{
Expand Down
Loading

0 comments on commit 7d5d5ff

Please sign in to comment.