Skip to content

Commit

Permalink
Ckpt size retrieval logic fix (#838)
Browse files Browse the repository at this point in the history
* Add

* updates

* updates

* AzureStorageDevice read unloaded blob

* update
  • Loading branch information
badrishc authored Jun 1, 2023
1 parent 73fe128 commit 0e03937
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 18 deletions.
39 changes: 32 additions & 7 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -382,16 +382,39 @@ public long Enqueue(ReadOnlySpan<byte> entry)
/// Enqueue raw pre-formatted bytes with headers to the log (in memory).
/// </summary>
/// <param name="entryBytes">Raw bytes to be enqueued to log</param>
/// <param name="noCommit">Do not auto-commit</param>
/// <returns>First logical address of added entries</returns>
public long UnsafeEnqueueRaw(ReadOnlySpan<byte> entryBytes)
public long UnsafeEnqueueRaw(ReadOnlySpan<byte> entryBytes, bool noCommit = false)
{
long logicalAddress;
while (!UnsafeTryEnqueueRaw(entryBytes, out logicalAddress))
while (!UnsafeTryEnqueueRaw(entryBytes, noCommit, out logicalAddress))
Thread.Yield();
return logicalAddress;

}

/// <summary>
/// Commit metadata only (no records added to main log)
/// </summary>
/// <param name="info"></param>
public void UnsafeCommitMetadataOnly(FasterLogRecoveryInfo info)
{
lock (ongoingCommitRequests)
{
ongoingCommitRequests.Enqueue((info.UntilAddress, info));
}
try
{
epoch.Resume();
if (!allocator.ShiftReadOnlyToTail(out _, out _))
CommitMetadataOnly(ref info);
}
finally
{
epoch.Suspend();
}
}

/// <summary>
/// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit
/// </summary>
Expand Down Expand Up @@ -560,9 +583,10 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)
/// done. If it returns false, we need to retry.
/// </summary>
/// <param name="entryBytes">Entry bytes to be enqueued to log</param>
/// <param name="noCommit">Do not auto-commit</param>
/// <param name="logicalAddress">Logical address of added entry</param>
/// <returns>Whether the append succeeded</returns>
public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan<byte> entryBytes, out long logicalAddress)
public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan<byte> entryBytes, bool noCommit, out long logicalAddress)
{
int length = entryBytes.Length;

Expand All @@ -589,7 +613,7 @@ public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan<byte> entryBytes, out long l
entryBytes.CopyTo(new Span<byte>((byte*)physicalAddress, length));
if (AutoRefreshSafeTailAddress) DoAutoRefreshSafeTailAddress();
epoch.Suspend();
if (AutoCommit) Commit();
if (AutoCommit && !noCommit) Commit();
return true;
}

Expand Down Expand Up @@ -1693,8 +1717,9 @@ public void TruncateUntilPageStart(long untilAddress)
/// <param name="recover">Whether to recover named iterator from latest commit (if exists). If false, iterator starts from beginAddress.</param>
/// <param name="scanBufferingMode">Use single or double buffering</param>
/// <param name="scanUncommitted">Whether we scan uncommitted data</param>
/// <param name="logger"></param>
/// <returns></returns>
public FasterLogScanIterator Scan(long beginAddress, long endAddress, string name = null, bool recover = true, ScanBufferingMode scanBufferingMode = ScanBufferingMode.DoublePageBuffering, bool scanUncommitted = false)
public FasterLogScanIterator Scan(long beginAddress, long endAddress, string name = null, bool recover = true, ScanBufferingMode scanBufferingMode = ScanBufferingMode.DoublePageBuffering, bool scanUncommitted = false, ILogger logger = null)
{
if (readOnlyMode)
{
Expand All @@ -1711,9 +1736,9 @@ public FasterLogScanIterator Scan(long beginAddress, long endAddress, string nam

FasterLogScanIterator iter;
if (recover && name != null && RecoveredIterators != null && RecoveredIterators.ContainsKey(name))
iter = new FasterLogScanIterator(this, allocator, RecoveredIterators[name], endAddress, getMemory, scanBufferingMode, epoch, headerSize, name, scanUncommitted);
iter = new FasterLogScanIterator(this, allocator, RecoveredIterators[name], endAddress, getMemory, scanBufferingMode, epoch, headerSize, name, scanUncommitted, logger: logger);
else
iter = new FasterLogScanIterator(this, allocator, beginAddress, endAddress, getMemory, scanBufferingMode, epoch, headerSize, name, scanUncommitted);
iter = new FasterLogScanIterator(this, allocator, beginAddress, endAddress, getMemory, scanBufferingMode, epoch, headerSize, name, scanUncommitted, logger: logger);

if (name != null)
{
Expand Down
11 changes: 8 additions & 3 deletions cs/src/core/FasterLog/FasterLogRecoveryInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;

namespace FASTER.core
{
Expand Down Expand Up @@ -104,7 +105,9 @@ public void Initialize(BinaryReader reader)
Iterators = new Dictionary<string, long>();
for (int i = 0; i < iteratorCount; i++)
{
Iterators.Add(reader.ReadString(), reader.ReadInt64());
int len = reader.ReadInt32();
byte[] bytes = reader.ReadBytes(len);
Iterators.Add(Encoding.UTF8.GetString(bytes), reader.ReadInt64());
}
}

Expand Down Expand Up @@ -185,7 +188,9 @@ public readonly byte[] ToByteArray()
{
foreach (var kvp in Iterators)
{
writer.Write(kvp.Key);
var bytes = Encoding.UTF8.GetBytes(kvp.Key);
writer.Write(bytes.Length);
writer.Write(bytes);
writer.Write(kvp.Value);
}
}
Expand All @@ -206,7 +211,7 @@ public int SerializedSize()
if (Iterators != null)
{
foreach (var kvp in Iterators)
iteratorSize += kvp.Key.Length + sizeof(long);
iteratorSize += sizeof(int) + Encoding.UTF8.GetByteCount(kvp.Key) + sizeof(long);
}

return sizeof(int) + 4 * sizeof(long) + iteratorSize + sizeof(int) + (Cookie?.Length ?? 0);
Expand Down
31 changes: 28 additions & 3 deletions cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,25 @@ internal RecoveryOptions(long headAddress, long fuzzyRegionStartAddress, bool un
}
}

/// <summary>
/// Log File info
/// </summary>
public struct LogFileInfo
{
/// <summary>
/// Snapshot file end address (start address is always 0)
/// </summary>
public long snapshotFileEndAddress;
/// <summary>
/// Hybrid log file start address
/// </summary>
public long hybridLogFileStartAddress;
/// <summary>
/// Hybrid log file end address
/// </summary>
public long hybridLogFileEndAddress;
}

public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
{
/// <summary>
Expand Down Expand Up @@ -168,12 +187,18 @@ public long GetLatestCheckpointVersion()
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public long GetSnapshotFileSizes(Guid token)
public LogFileInfo GetLogFileSize(Guid token)
{
using var current = new HybridLogCheckpointInfo();
current.Recover(token, checkpointManager, hlog.LogPageSizeBits,
out var _, false);
return current.info.finalLogicalAddress;
long snapshotDeviceOffset = hlog.GetPage(current.info.snapshotStartFlushedLogicalAddress) << hlog.LogPageSizeBits;
return new LogFileInfo
{
snapshotFileEndAddress = current.info.snapshotFinalLogicalAddress - snapshotDeviceOffset,
hybridLogFileStartAddress = hlog.GetPage(current.info.beginAddress) << hlog.LogPageSizeBits,
hybridLogFileEndAddress = current.info.flushedLogicalAddress
};
}

/// <summary>
Expand All @@ -185,7 +210,7 @@ public long GetIndexFileSize(Guid token)
{
IndexCheckpointInfo recoveredICInfo = new IndexCheckpointInfo();
recoveredICInfo.Recover(token, checkpointManager);
return (long)recoveredICInfo.info.num_ht_bytes;
return (long)(recoveredICInfo.info.num_ht_bytes + recoveredICInfo.info.num_ofb_bytes);
}

private void GetClosestHybridLogCheckpointInfo(
Expand Down
10 changes: 5 additions & 5 deletions cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -437,13 +437,13 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr
TimeStamp = DateTime.UtcNow
});

// It is up to the allocator to make sure no reads are issued to segments before they are written
// Lazily cache the blob entry for the segment being read
if (!this.blobs.TryGetValue(segmentId, out BlobEntry blobEntry))
{
var nonLoadedBlob = this.pageBlobDirectory.GetPageBlobClient(this.GetSegmentBlobName(segmentId));
var exception = new InvalidOperationException("Attempt to read a non-loaded segment");
this.BlobManager?.HandleStorageError(nameof(ReadAsync), exception.Message, nonLoadedBlob.Default?.Name, exception, false, true);
throw exception;
var blobClients = this.pageBlobDirectory.GetPageBlobClient(this.GetSegmentBlobName(segmentId));
var entry = new BlobEntry(blobClients, blobClients.Default.GetProperties().Value.ETag, this);
this.blobs.TryAdd(segmentId, entry);
blobEntry = this.blobs[segmentId];
}

this.ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id)
Expand Down

0 comments on commit 0e03937

Please sign in to comment.