Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Option to create barrier at checkpoint version switch, support rollback #813

Merged
merged 12 commits into from
Apr 18, 2023
118 changes: 118 additions & 0 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -167,6 +168,11 @@ public abstract partial class AllocatorBase<Key, Value> : IDisposable
/// The lowest valid address in the log
/// </summary>
public long BeginAddress;

/// <summary>
/// The lowest valid address on disk - updated when truncating log
/// </summary>
public long PersistedBeginAddress;

/// <summary>
/// Address until which we are currently closing. Used to coordinate linear closing of pages.
Expand Down Expand Up @@ -877,6 +883,99 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
/// </summary>
internal abstract int OverflowPageCount { get; }


/// <summary>
/// Reset the hybrid log. WARNING: assumes that threads have drained out at this point.
/// </summary>
public virtual void Reset()
{
var newBeginAddress = GetTailAddress();

// Shift read-only addresses to tail without flushing
Utility.MonotonicUpdate(ref ReadOnlyAddress, newBeginAddress, out _);
Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out _);

// Shift head address to tail
if (Utility.MonotonicUpdate(ref HeadAddress, newBeginAddress, out _))
{
// Close addresses
OnPagesClosed(newBeginAddress);

// Wait for pages to get closed
while (ClosedUntilAddress < newBeginAddress)
{
Thread.Yield();
if (epoch.ThisInstanceProtected())
epoch.ProtectAndDrain();
}
}

// Update begin address to tail
Utility.MonotonicUpdate(ref BeginAddress, newBeginAddress, out _);

this.FlushEvent.Initialize();
Array.Clear(PageStatusIndicator, 0, BufferSize);
for (int i = 0; i < BufferSize; i++)
PendingFlush[i].list.Clear();
}

internal void VerifyRecoveryInfo(HybridLogCheckpointInfo recoveredHLCInfo, bool trimLog = false)
{
// Note: trimLog is unused right now. Can be used to trim the log to the minimum
// segment range necessary for recovery to given checkpoint

var diskBeginAddress = recoveredHLCInfo.info.beginAddress;
var diskFlushedUntilAddress =
recoveredHLCInfo.info.useSnapshotFile == 0 ?
recoveredHLCInfo.info.finalLogicalAddress :
recoveredHLCInfo.info.flushedLogicalAddress;

// Delete disk segments until specified disk begin address

// First valid disk segment required for recovery
long firstValidSegment = (int)(diskBeginAddress >> LogSegmentSizeBits);

// Last valid disk segment required for recovery
int lastValidSegment = (int)(diskFlushedUntilAddress >> LogSegmentSizeBits);
if ((diskFlushedUntilAddress & ((1L << LogSegmentSizeBits) - 1)) == 0)
lastValidSegment--;

logger?.LogInformation("Recovery requires disk segments in range [{firstSegment}--{tailStartSegment}]", firstValidSegment, lastValidSegment);

var firstAvailSegment = device.StartSegment;
var lastAvailSegment = device.EndSegment;

if (FlushedUntilAddress > GetFirstValidLogicalAddress(0))
{
int currTailSegment = (int)(FlushedUntilAddress >> LogSegmentSizeBits);
if ((FlushedUntilAddress & ((1L << LogSegmentSizeBits) - 1)) == 0)
currTailSegment--;

if (currTailSegment > lastAvailSegment)
lastAvailSegment = currTailSegment;
}

logger?.LogInformation("Available segment range on device: [{firstAvailSegment}--{lastAvailSegment}]", firstAvailSegment, lastAvailSegment);

if (firstValidSegment < firstAvailSegment)
throw new FasterException($"Unable to set first valid segment to {firstValidSegment}, first available segment on disk is {firstAvailSegment}");

if (lastAvailSegment >= 0 && lastValidSegment > lastAvailSegment)
throw new FasterException($"Unable to set last valid segment to {lastValidSegment}, last available segment on disk is {lastAvailSegment}");

if (trimLog)
{
logger?.LogInformation("Trimming disk segments until (not including) {firstSegment}", firstValidSegment);
TruncateUntilAddressBlocking(firstValidSegment << LogSegmentSizeBits);

for (int s = lastValidSegment + 1; s <= lastAvailSegment; s++)
{
logger?.LogInformation("Trimming tail segment {s} on disk", s);
RemoveSegment(s);
}
}
}

/// <summary>
/// Initialize allocator
/// </summary>
Expand Down Expand Up @@ -1294,9 +1393,28 @@ public void ShiftBeginAddress(long newBeginAddress, bool truncateLog)
/// <param name="toAddress"></param>
protected virtual void TruncateUntilAddress(long toAddress)
{
PersistedBeginAddress = toAddress;
Task.Run(() => device.TruncateUntilAddress(toAddress));
}

/// <summary>
/// Wraps <see cref="IDevice.TruncateUntilAddress(long)"/> when an allocator potentially has to interact with multiple devices
/// </summary>
/// <param name="toAddress"></param>
protected virtual void TruncateUntilAddressBlocking(long toAddress)
{
device.TruncateUntilAddress(toAddress);
}

/// <summary>
/// Remove disk segment
/// </summary>
/// <param name="segment"></param>
protected virtual void RemoveSegment(int segment)
{
device.RemoveSegment(segment);
}

internal virtual bool TryComplete()
{
return device.TryComplete();
Expand Down
51 changes: 34 additions & 17 deletions cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Runtime.InteropServices;
using Microsoft.Extensions.Logging;
using System.Diagnostics;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

Expand Down Expand Up @@ -58,6 +59,38 @@ public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> com
}
}

public override void Reset()
{
base.Reset();
for (int index = 0; index < BufferSize; index++)
{
ReturnPage(index);
}
Initialize();
}

void ReturnPage(int index)
{
Debug.Assert(index < BufferSize);
if (values[index] != null)
{
overflowPagePool.TryAdd(new PageUnit
{
#if !NET5_0_OR_GREATER
handle = handles[index],
#endif
pointer = pointers[index],
value = values[index]
});
values[index] = null;
pointers[index] = 0;
#if !NET5_0_OR_GREATER
handles[index] = default;
#endif
Interlocked.Decrement(ref AllocatedPageCount);
}
}

public override void Initialize()
{
Initialize(Constants.kFirstValidAddress);
Expand Down Expand Up @@ -254,23 +287,7 @@ internal override void FreePage(long page)
{
ClearPage(page, 0);
if (EmptyPageCount > 0)
{
int index = (int)(page % BufferSize);
overflowPagePool.TryAdd(new PageUnit
{
#if !NET5_0_OR_GREATER
handle = handles[index],
#endif
pointer = pointers[index],
value = values[index]
});
values[index] = null;
pointers[index] = 0;
#if !NET5_0_OR_GREATER
handles[index] = default;
#endif
Interlocked.Decrement(ref AllocatedPageCount);
}
ReturnPage((int)(page % BufferSize));
}

/// <summary>
Expand Down
41 changes: 36 additions & 5 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,29 @@ public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> ser

internal override int OverflowPageCount => overflowPagePool.Count;

public override void Reset()
{
base.Reset();
for (int index = 0; index < BufferSize; index++)
{
ReturnPage(index);
}

Array.Clear(segmentOffsets, 0, segmentOffsets.Length);
Initialize();
}

void ReturnPage(int index)
{
Debug.Assert(index < BufferSize);
if (values[index] != default)
{
overflowPagePool.TryAdd(values[index]);
values[index] = default;
Interlocked.Decrement(ref AllocatedPageCount);
}
}

public override void Initialize()
{
Initialize(recordSize);
Expand Down Expand Up @@ -267,6 +290,18 @@ protected override void TruncateUntilAddress(long toAddress)
objectLogDevice.TruncateUntilSegment((int)(toAddress >> LogSegmentSizeBits));
}

protected override void TruncateUntilAddressBlocking(long toAddress)
{
base.TruncateUntilAddressBlocking(toAddress);
objectLogDevice.TruncateUntilSegment((int)(toAddress >> LogSegmentSizeBits));
}

protected override void RemoveSegment(int segment)
{
base.RemoveSegment(segment);
objectLogDevice.RemoveSegment(segment);
}

protected override void WriteAsync<TContext>(long flushPage, DeviceIOCompletionCallback callback, PageAsyncFlushResult<TContext> asyncResult)
{
WriteAsync(flushPage,
Expand Down Expand Up @@ -332,11 +367,7 @@ internal override void FreePage(long page)
}

if (EmptyPageCount > 0)
{
overflowPagePool.TryAdd(values[page % BufferSize]);
values[page % BufferSize] = default;
Interlocked.Decrement(ref AllocatedPageCount);
}
ReturnPage((int)(page % BufferSize));
}

private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddress, uint numBytesToWrite,
Expand Down
51 changes: 34 additions & 17 deletions cs/src/core/Allocator/VarLenBlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using Microsoft.Extensions.Logging;
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
Expand Down Expand Up @@ -75,6 +76,38 @@ public VariableLengthBlittableAllocator(LogSettings settings, VariableLengthStru

internal override int OverflowPageCount => overflowPagePool.Count;

public override void Reset()
{
base.Reset();
for (int index = 0; index < BufferSize; index++)
{
ReturnPage(index);
}
Initialize();
}

void ReturnPage(int index)
{
Debug.Assert(index < BufferSize);
if (values[index] != null)
{
overflowPagePool.TryAdd(new PageUnit
{
#if !NET5_0_OR_GREATER
handle = handles[index],
#endif
pointer = pointers[index],
value = values[index]
});
values[index] = null;
pointers[index] = 0;
#if !NET5_0_OR_GREATER
handles[index] = default;
#endif
Interlocked.Decrement(ref AllocatedPageCount);
}
}

public override void Initialize()
{
Initialize(Constants.kFirstValidAddress);
Expand Down Expand Up @@ -357,23 +390,7 @@ internal override void FreePage(long page)
{
ClearPage(page, 0);
if (EmptyPageCount > 0)
{
int index = (int)(page % BufferSize);
overflowPagePool.TryAdd(new PageUnit
{
#if !NET5_0_OR_GREATER
handle = handles[index],
#endif
pointer = pointers[index],
value = values[index]
});
values[index] = null;
pointers[index] = 0;
#if !NET5_0_OR_GREATER
handles[index] = default;
#endif
Interlocked.Decrement(ref AllocatedPageCount);
}
ReturnPage((int)(page % BufferSize));
}

/// <summary>
Expand Down
Loading