diff --git a/cs/src/core/Allocator/ScanIteratorBase.cs b/cs/src/core/Allocator/ScanIteratorBase.cs
index db3b87124..8ccdbc9b0 100644
--- a/cs/src/core/Allocator/ScanIteratorBase.cs
+++ b/cs/src/core/Allocator/ScanIteratorBase.cs
@@ -178,6 +178,35 @@ protected unsafe bool BufferAndLoad(long currentAddress, long currentPage, long
return WaitForFrameLoad(currentAddress, currentFrame);
}
+ ///
+ /// Whether we need to buffer new page from disk
+ ///
+ protected unsafe bool NeedBufferAndLoad(long currentAddress, long currentPage, long currentFrame, long headAddress, long endAddress)
+ {
+ for (int i = 0; i < frameSize; i++)
+ {
+ var nextPage = currentPage + i;
+
+ var pageStartAddress = nextPage << logPageSizeBits;
+
+ // Cannot load page if it is entirely in memory or beyond the end address
+ if (pageStartAddress >= headAddress || pageStartAddress >= endAddress)
+ continue;
+
+ var pageEndAddress = (nextPage + 1) << logPageSizeBits;
+ if (endAddress < pageEndAddress)
+ pageEndAddress = endAddress;
+ if (headAddress < pageEndAddress)
+ pageEndAddress = headAddress;
+
+ var nextFrame = (currentFrame + i) % frameSize;
+
+ if (nextLoadedPage[nextFrame] < pageEndAddress || loadedPage[nextFrame] < pageEndAddress)
+ return true;
+ }
+ return false;
+ }
+
internal abstract void AsyncReadPagesFromDeviceToFrame(long readPageStart, int numPages, long untilAddress, TContext context, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null, CancellationTokenSource cts = null);
private bool WaitForFrameLoad(long currentAddress, long currentFrame)
diff --git a/cs/src/core/FasterLog/FasterLog.cs b/cs/src/core/FasterLog/FasterLog.cs
index 8235a4672..78e2d4ae7 100644
--- a/cs/src/core/FasterLog/FasterLog.cs
+++ b/cs/src/core/FasterLog/FasterLog.cs
@@ -94,6 +94,11 @@ public sealed class FasterLog : IDisposable
///
public byte[] RecoveredCookie;
+ ///
+ /// Header size used by FasterLog
+ ///
+ public int HeaderSize => headerSize;
+
///
/// Task notifying commit completions
///
@@ -373,6 +378,20 @@ public long Enqueue(ReadOnlySpan entry)
return logicalAddress;
}
+ ///
+ /// Enqueue raw pre-formatted bytes with headers to the log (in memory).
+ ///
+ /// Raw bytes to be enqueued to log
+ /// First logical address of added entries
+ public long UnsafeEnqueueRaw(ReadOnlySpan entryBytes)
+ {
+ long logicalAddress;
+ while (!UnsafeTryEnqueueRaw(entryBytes, out logicalAddress))
+ Thread.Yield();
+ return logicalAddress;
+
+ }
+
///
/// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit
///
@@ -536,6 +555,44 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)
return true;
}
+ ///
+ /// Try to enqueue raw pre-formatted bytes with headers to the log (in memory). If it returns true, we are
+ /// done. If it returns false, we need to retry.
+ ///
+ /// Entry bytes to be enqueued to log
+ /// Logical address of added entry
+ /// Whether the append succeeded
+ public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan entryBytes, out long logicalAddress)
+ {
+ int length = entryBytes.Length;
+
+ // Length should be pre-aligned
+ Debug.Assert(length == Align(length));
+ logicalAddress = 0;
+ int allocatedLength = length;
+ ValidateAllocatedLength(allocatedLength);
+
+ epoch.Resume();
+
+ if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
+
+ logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
+ if (logicalAddress == 0)
+ if (logicalAddress == 0)
+ {
+ epoch.Suspend();
+ if (cannedException != null) throw cannedException;
+ return false;
+ }
+
+ var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
+ entryBytes.CopyTo(new Span((byte*)physicalAddress, length));
+ if (AutoRefreshSafeTailAddress) DoAutoRefreshSafeTailAddress();
+ epoch.Suspend();
+ if (AutoCommit) Commit();
+ return true;
+ }
+
///
/// Try to append entry to log. If it returns true, we are
/// done. If it returns false, we need to retry.
@@ -2610,6 +2667,22 @@ internal unsafe int GetLength(byte* ptr)
return 0;
}
+ ///
+ /// Get length of entry from pointer to header
+ ///
+ ///
+ ///
+ public unsafe int UnsafeGetLength(byte* headerPtr)
+ => GetLength(headerPtr);
+
+ ///
+ /// Get aligned version of record length
+ ///
+ ///
+ ///
+ public int UnsafeAlign(int length)
+ => Align(length);
+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal unsafe bool VerifyChecksum(byte* ptr, int length)
{
diff --git a/cs/src/core/FasterLog/FasterLogIterator.cs b/cs/src/core/FasterLog/FasterLogIterator.cs
index 92260b12d..f4d8559a4 100644
--- a/cs/src/core/FasterLog/FasterLogIterator.cs
+++ b/cs/src/core/FasterLog/FasterLogIterator.cs
@@ -131,6 +131,28 @@ public async IAsyncEnumerable<(IMemoryOwner entry, int entryLength, long c
}
}
+ ///
+ /// Asynchronously consume the log with given consumer until end of iteration or cancelled
+ ///
+ /// consumer
+ /// throttle the iteration speed
+ /// max size of returned chunk
+ /// cancellation token
+ /// consumer type
+ public async Task BulkConsumeAllAsync(T consumer, int throttleMs = 0, int maxChunkSize = 0, CancellationToken token = default) where T : IBulkLogEntryConsumer
+ {
+ while (!disposed)
+ {
+ // TryConsumeNext returns false if we have to wait for the next record.
+ while (!TryBulkConsumeNext(consumer, maxChunkSize))
+ {
+ if (!await WaitAsync(token).ConfigureAwait(false))
+ return;
+ if (throttleMs > 0) await Task.Delay(throttleMs, token).ConfigureAwait(false);
+ }
+ }
+ }
+
///
/// Wait for iteration to be ready to continue
///
@@ -242,7 +264,7 @@ public unsafe bool GetNext(out byte[] entry, out int entryLength, out long curre
{
var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress,
out nextAddress,
- out isCommitRecord);
+ out isCommitRecord, out _);
if (!hasNext)
{
entry = default;
@@ -337,7 +359,7 @@ public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry,
{
var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress,
out nextAddress,
- out isCommitRecord);
+ out isCommitRecord, out _);
if (!hasNext)
{
entry = default;
@@ -403,7 +425,7 @@ public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry,
{
var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress,
out nextAddress,
- out isCommitRecord);
+ out isCommitRecord, out _);
if (!hasNext)
{
epoch.Suspend();
@@ -433,6 +455,82 @@ public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry,
}
}
+ ///
+ /// Consume the next entry in the log with the given consumer
+ ///
+ /// consumer
+ ///
+ /// concrete type of consumer
+ /// whether a next entry is present
+ public unsafe bool TryBulkConsumeNext(T consumer, int maxChunkSize = 0) where T : IBulkLogEntryConsumer
+ {
+ if (maxChunkSize == 0) maxChunkSize = allocator.PageSize;
+
+ if (disposed)
+ {
+ currentAddress = default;
+ nextAddress = default;
+ return false;
+ }
+
+ bool retVal;
+
+ epoch.Resume();
+
+ // Find a contiguous set of log entries
+ try
+ {
+ while (true)
+ {
+ var hasNext = GetNextInternal(out long startPhysicalAddress, out int newEntryLength, out long startLogicalAddress, out long endLogicalAddress, out bool isCommitRecord, out bool onFrame);
+
+ if (!hasNext)
+ {
+ retVal = false;
+ break;
+ }
+
+ // GetNextInternal returns only the payload length, so adjust the totalLength
+ int totalLength = headerSize + Align(newEntryLength);
+
+ // Expand the records in iteration, as long as as they are on the same physical page
+ while (ExpandGetNextInternal(startPhysicalAddress, ref totalLength, out long newCurrentAddress, out endLogicalAddress, out isCommitRecord))
+ {
+ if (totalLength > maxChunkSize)
+ break;
+ }
+
+ // Consume the chunk
+ if (onFrame)
+ {
+ // Record in frame, so we do no need epoch protection to access it
+ epoch.Suspend();
+ try
+ {
+ consumer.Consume((byte*)startPhysicalAddress, totalLength, startLogicalAddress, endLogicalAddress);
+ }
+ finally
+ {
+ epoch.Resume();
+ }
+ }
+ else
+ {
+ // Consume the chunk (warning: we are under epoch protection here, as we are consuming directly from main memory log buffer)
+ consumer.Consume((byte*)startPhysicalAddress, totalLength, startLogicalAddress, endLogicalAddress);
+
+ // Refresh epoch to maintain liveness of log append
+ epoch.ProtectAndDrain();
+ }
+ }
+ }
+ finally
+ {
+ epoch.Suspend();
+ }
+ return retVal;
+ }
+
///
/// WARNING: advanced users only.
/// Get next record in iterator, accessing unsafe raw bytes and retaining epoch protection.
@@ -463,7 +561,7 @@ public unsafe bool UnsafeGetNext(out byte* entry, out int entryLength, out long
{
var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress,
out nextAddress,
- out isCommitRecord);
+ out isCommitRecord, out _);
if (!hasNext)
{
entry = default;
@@ -603,7 +701,7 @@ internal unsafe bool ScanForwardForCommit(ref FasterLogRecoveryInfo info, long c
// Continue looping until we find a record that is a commit record
while (GetNextInternal(out long physicalAddress, out var entryLength, out long currentAddress,
out long nextAddress,
- out var isCommitRecord))
+ out var isCommitRecord, out _))
{
if (!isCommitRecord) continue;
@@ -648,8 +746,9 @@ internal unsafe bool ScanForwardForCommit(ref FasterLogRecoveryInfo info, long c
///
///
///
+ ///
///
- private unsafe bool GetNextInternal(out long physicalAddress, out int entryLength, out long currentAddress, out long outNextAddress, out bool commitRecord)
+ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLength, out long currentAddress, out long outNextAddress, out bool commitRecord, out bool onFrame)
{
while (true)
{
@@ -658,6 +757,7 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt
currentAddress = nextAddress;
outNextAddress = currentAddress;
commitRecord = false;
+ onFrame = false;
// Check for boundary conditions
if (currentAddress < allocator.BeginAddress)
@@ -692,6 +792,7 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt
if (BufferAndLoad(currentAddress, _currentPage, _currentFrame, _headAddress, _endAddress))
continue;
physicalAddress = frame.GetPhysicalAddress(_currentFrame, _currentOffset);
+ onFrame = true;
}
else
{
@@ -758,5 +859,104 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt
}
}
}
+
+ private unsafe bool ExpandGetNextInternal(long startPhysicalAddress, ref int totalEntryLength, out long currentAddress, out long outNextAddress, out bool commitRecord)
+ {
+ while (true)
+ {
+ long physicalAddress;
+ int entryLength;
+ currentAddress = nextAddress;
+ outNextAddress = currentAddress;
+ commitRecord = false;
+
+ // Check for boundary conditions
+ if (currentAddress < allocator.BeginAddress)
+ {
+ Utility.MonotonicUpdate(ref nextAddress, allocator.BeginAddress, out _);
+ currentAddress = nextAddress;
+ outNextAddress = currentAddress;
+ }
+
+ var _currentPage = currentAddress >> allocator.LogPageSizeBits;
+ var _currentFrame = _currentPage % frameSize;
+ var _currentOffset = currentAddress & allocator.PageSizeMask;
+ var _headAddress = allocator.HeadAddress;
+
+ if (disposed)
+ return false;
+
+ if ((currentAddress >= endAddress) || (currentAddress >= (scanUncommitted ? fasterLog.SafeTailAddress : fasterLog.CommittedUntilAddress)))
+ return false;
+
+ if (currentAddress < _headAddress)
+ {
+ var _endAddress = endAddress;
+ if (fasterLog.readOnlyMode)
+ {
+ // Support partial page reads of committed data
+ var _flush = fasterLog.CommittedUntilAddress;
+ if (_flush < endAddress)
+ _endAddress = _flush;
+ }
+
+ if (NeedBufferAndLoad(currentAddress, _currentPage, _currentFrame, _headAddress, _endAddress))
+ return false;
+
+ physicalAddress = frame.GetPhysicalAddress(_currentFrame, _currentOffset);
+ }
+ else
+ {
+ physicalAddress = allocator.GetPhysicalAddress(currentAddress);
+ }
+
+ if (physicalAddress != startPhysicalAddress + totalEntryLength)
+ return false;
+
+ // Get and check entry length
+ entryLength = fasterLog.GetLength((byte*)physicalAddress);
+
+ // We may encounter zeroed out bits at the end of page in a normal log, therefore, we need to check whether that is the case
+ if (entryLength == 0)
+ {
+ return false;
+ }
+
+ // commit records have negative length fields
+ if (entryLength < 0)
+ {
+ commitRecord = true;
+ entryLength = -entryLength;
+ }
+
+ int recordSize = headerSize + Align(entryLength);
+ if (_currentOffset + recordSize > allocator.PageSize)
+ {
+ return false;
+ }
+
+ // Verify checksum if needed
+ if (currentAddress < _headAddress)
+ {
+ if (!fasterLog.VerifyChecksum((byte*)physicalAddress, entryLength))
+ {
+ return false;
+ }
+ }
+
+ if ((currentAddress & allocator.PageSizeMask) + recordSize == allocator.PageSize)
+ currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
+ else
+ currentAddress += recordSize;
+
+ if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out long oldCurrentAddress))
+ {
+ totalEntryLength += recordSize;
+ outNextAddress = currentAddress;
+ currentAddress = oldCurrentAddress;
+ return true;
+ }
+ }
+ }
}
}
diff --git a/cs/src/core/FasterLog/ILogEntryConsumer.cs b/cs/src/core/FasterLog/ILogEntryConsumer.cs
index cbabc2ceb..e8f07dde4 100644
--- a/cs/src/core/FasterLog/ILogEntryConsumer.cs
+++ b/cs/src/core/FasterLog/ILogEntryConsumer.cs
@@ -15,4 +15,20 @@ public interface ILogEntryConsumer
/// (predicted) address of the next entry
public void Consume(ReadOnlySpan entry, long currentAddress, long nextAddress);
}
+
+ ///
+ /// Consumes FasterLog entries in bulk (raw data) without copying
+ ///
+ public interface IBulkLogEntryConsumer
+ {
+ ///
+ /// Consumes the given bulk entries (raw data).
+ ///
+ ///
+ ///
+ /// address of the consumed entry
+ /// (predicted) address of the next entry
+ unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddress, long nextAddress);
+ }
+
}
\ No newline at end of file