Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Option to create barrier at checkpoint version switch, support rollback #813

Merged
merged 12 commits into from
Apr 18, 2023
118 changes: 118 additions & 0 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -167,6 +168,11 @@ public abstract partial class AllocatorBase<Key, Value> : IDisposable
/// The lowest valid address in the log
/// </summary>
public long BeginAddress;

/// <summary>
/// The lowest valid address on disk - updated when truncating log
/// </summary>
public long PersistedBeginAddress;

/// <summary>
/// Address until which we are currently closing. Used to coordinate linear closing of pages.
Expand Down Expand Up @@ -877,6 +883,99 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
/// </summary>
internal abstract int OverflowPageCount { get; }


/// <summary>
/// Reset the hybrid log. WARNING: assumes that threads have drained out at this point.
/// </summary>
public virtual void Reset()
{
var newBeginAddress = GetTailAddress();

// Shift read-only addresses to tail without flushing
Utility.MonotonicUpdate(ref ReadOnlyAddress, newBeginAddress, out _);
Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out _);

// Shift head address to tail
if (Utility.MonotonicUpdate(ref HeadAddress, newBeginAddress, out _))
{
// Close addresses
OnPagesClosed(newBeginAddress);

// Wait for pages to get closed
while (ClosedUntilAddress < newBeginAddress)
{
Thread.Yield();
if (epoch.ThisInstanceProtected())
epoch.ProtectAndDrain();
}
}

// Update begin address to tail
Utility.MonotonicUpdate(ref BeginAddress, newBeginAddress, out _);

this.FlushEvent.Initialize();
Array.Clear(PageStatusIndicator, 0, BufferSize);
for (int i = 0; i < BufferSize; i++)
PendingFlush[i].list.Clear();
}

internal void VerifyRecoveryInfo(HybridLogCheckpointInfo recoveredHLCInfo, bool trimLog = false)
{
// Note: trimLog is unused right now. Can be used to trim the log to the minimum
// segment range necessary for recovery to given checkpoint

var diskBeginAddress = recoveredHLCInfo.info.beginAddress;
var diskFlushedUntilAddress =
recoveredHLCInfo.info.useSnapshotFile == 0 ?
recoveredHLCInfo.info.finalLogicalAddress :
recoveredHLCInfo.info.flushedLogicalAddress;

// Delete disk segments until specified disk begin address

// First valid disk segment required for recovery
long firstValidSegment = (int)(diskBeginAddress >> LogSegmentSizeBits);

// Last valid disk segment required for recovery
int lastValidSegment = (int)(diskFlushedUntilAddress >> LogSegmentSizeBits);
if ((diskFlushedUntilAddress & ((1L << LogSegmentSizeBits) - 1)) == 0)
lastValidSegment--;

logger?.LogInformation("Recovery requires disk segments in range [{firstSegment}--{tailStartSegment}]", firstValidSegment, lastValidSegment);

var firstAvailSegment = device.StartSegment;
var lastAvailSegment = device.EndSegment;

if (FlushedUntilAddress > GetFirstValidLogicalAddress(0))
{
int currTailSegment = (int)(FlushedUntilAddress >> LogSegmentSizeBits);
if ((FlushedUntilAddress & ((1L << LogSegmentSizeBits) - 1)) == 0)
currTailSegment--;

if (currTailSegment > lastAvailSegment)
lastAvailSegment = currTailSegment;
}

logger?.LogInformation("Available segment range on device: [{firstAvailSegment}--{lastAvailSegment}]", firstAvailSegment, lastAvailSegment);

if (firstValidSegment < firstAvailSegment)
throw new FasterException($"Unable to set first valid segment to {firstValidSegment}, first available segment on disk is {firstAvailSegment}");

if (lastAvailSegment >= 0 && lastValidSegment > lastAvailSegment)
throw new FasterException($"Unable to set last valid segment to {lastValidSegment}, last available segment on disk is {lastAvailSegment}");

if (trimLog)
{
logger?.LogInformation("Trimming disk segments until (not including) {firstSegment}", firstValidSegment);
TruncateUntilAddressBlocking(firstValidSegment << LogSegmentSizeBits);

for (int s = lastValidSegment + 1; s <= lastAvailSegment; s++)
{
logger?.LogInformation("Trimming tail segment {s} on disk", s);
RemoveSegment(s);
}
}
}

/// <summary>
/// Initialize allocator
/// </summary>
Expand Down Expand Up @@ -1294,9 +1393,28 @@ public void ShiftBeginAddress(long newBeginAddress, bool truncateLog)
/// <param name="toAddress"></param>
protected virtual void TruncateUntilAddress(long toAddress)
{
PersistedBeginAddress = toAddress;
Task.Run(() => device.TruncateUntilAddress(toAddress));
}

/// <summary>
/// Wraps <see cref="IDevice.TruncateUntilAddress(long)"/> when an allocator potentially has to interact with multiple devices
/// </summary>
/// <param name="toAddress"></param>
protected virtual void TruncateUntilAddressBlocking(long toAddress)
{
device.TruncateUntilAddress(toAddress);
}

/// <summary>
/// Remove disk segment
/// </summary>
/// <param name="segment"></param>
protected virtual void RemoveSegment(int segment)
{
device.RemoveSegment(segment);
}

internal virtual bool TryComplete()
{
return device.TryComplete();
Expand Down
26 changes: 26 additions & 0 deletions cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,32 @@ public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> com
}
}

public override void Reset()
{
base.Reset();
for (int index = 0; index < BufferSize; index++)
{
if (values[index] != null)
{
overflowPagePool.TryAdd(new PageUnit
{
#if !NET5_0_OR_GREATER
handle = handles[index],
#endif
pointer = pointers[index],
value = values[index]
});
values[index] = null;
pointers[index] = 0;
#if !NET5_0_OR_GREATER
handles[index] = default;
#endif
Interlocked.Decrement(ref AllocatedPageCount);
badrishc marked this conversation as resolved.
Show resolved Hide resolved
}
}
Initialize();
}

public override void Initialize()
{
Initialize(Constants.kFirstValidAddress);
Expand Down
29 changes: 29 additions & 0 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> ser

internal override int OverflowPageCount => overflowPagePool.Count;

public override void Reset()
{
base.Reset();
for (int index = 0; index < BufferSize; index++)
{
if (values[index] != default)
{
overflowPagePool.TryAdd(values[index % BufferSize]);
values[index % BufferSize] = default;
Interlocked.Decrement(ref AllocatedPageCount);
}
badrishc marked this conversation as resolved.
Show resolved Hide resolved
}

Array.Clear(segmentOffsets, 0, segmentOffsets.Length);
Initialize();
}

public override void Initialize()
{
Initialize(recordSize);
Expand Down Expand Up @@ -267,6 +284,18 @@ protected override void TruncateUntilAddress(long toAddress)
objectLogDevice.TruncateUntilSegment((int)(toAddress >> LogSegmentSizeBits));
}

protected override void TruncateUntilAddressBlocking(long toAddress)
{
base.TruncateUntilAddressBlocking(toAddress);
objectLogDevice.TruncateUntilSegment((int)(toAddress >> LogSegmentSizeBits));
}

protected override void RemoveSegment(int segment)
{
base.RemoveSegment(segment);
objectLogDevice.RemoveSegment(segment);
}

protected override void WriteAsync<TContext>(long flushPage, DeviceIOCompletionCallback callback, PageAsyncFlushResult<TContext> asyncResult)
{
WriteAsync(flushPage,
Expand Down
26 changes: 26 additions & 0 deletions cs/src/core/Allocator/VarLenBlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,32 @@ public VariableLengthBlittableAllocator(LogSettings settings, VariableLengthStru

internal override int OverflowPageCount => overflowPagePool.Count;

public override void Reset()
{
base.Reset();
for (int index = 0; index < BufferSize; index++)
{
if (values[index] != null)
{
overflowPagePool.TryAdd(new PageUnit
{
#if !NET5_0_OR_GREATER
handle = handles[index],
#endif
pointer = pointers[index],
value = values[index]
});
values[index] = null;
pointers[index] = 0;
#if !NET5_0_OR_GREATER
handles[index] = default;
#endif
Interlocked.Decrement(ref AllocatedPageCount);
badrishc marked this conversation as resolved.
Show resolved Hide resolved
}
}
Initialize();
}

public override void Initialize()
{
Initialize(Constants.kFirstValidAddress);
Expand Down
18 changes: 9 additions & 9 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public Status ReadAtAddress(ref Input input, ref Output output, ref ReadOptions
/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAsync(ref Key key, ref Input input, ref ReadOptions readOptions,
Context userContext = default, long serialNo = 0, CancellationToken cancellationToken = default)
Context userContext = default, long serialNo = 0, CancellationToken cancellationToken = default)
=> fht.ReadAsync(this.FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);

/// <inheritdoc/>
Expand Down Expand Up @@ -448,12 +448,12 @@ public Status Upsert(Key key, Input input, Value desiredValue, ref Output output

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.UpsertAsyncResult<Input, Output, Context>> UpsertAsync(ref Key key, ref Input input, ref Value desiredValue, Context userContext = default, long serialNo = 0, CancellationToken token = default)
public ValueTask<FasterKV<Key, Value>.UpsertAsyncResult<Input, Output, Context>> UpsertAsync(ref Key key, ref Input input, ref Value desiredValue, Context userContext = default, long serialNo = 0, CancellationToken token = default)
=> fht.UpsertAsync<Input, Output, Context, InternalFasterSession>(this.FasterSession, ref key, ref input, ref desiredValue, userContext, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.UpsertAsyncResult<Input, Output, Context>> UpsertAsync(Key key, Value desiredValue, Context userContext = default, long serialNo = 0, CancellationToken token = default)
public ValueTask<FasterKV<Key, Value>.UpsertAsyncResult<Input, Output, Context>> UpsertAsync(Key key, Value desiredValue, Context userContext = default, long serialNo = 0, CancellationToken token = default)
=> UpsertAsync(ref key, ref desiredValue, userContext, serialNo, token);

/// <inheritdoc/>
Expand All @@ -463,7 +463,7 @@ public Status Upsert(Key key, Input input, Value desiredValue, ref Output output

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status RMW(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
public Status RMW(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
=> RMW(ref key, ref input, ref output, out _, userContext, serialNo);

/// <inheritdoc/>
Expand Down Expand Up @@ -507,7 +507,7 @@ public Status RMW(Key key, Input input, Context userContext = default, long seri

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.RmwAsyncResult<Input, Output, Context>> RMWAsync(ref Key key, ref Input input, Context context = default, long serialNo = 0, CancellationToken token = default)
public ValueTask<FasterKV<Key, Value>.RmwAsyncResult<Input, Output, Context>> RMWAsync(ref Key key, ref Input input, Context context = default, long serialNo = 0, CancellationToken token = default)
=> fht.RmwAsync<Input, Output, Context, InternalFasterSession>(this.FasterSession, ref key, ref input, context, serialNo, token);

/// <inheritdoc/>
Expand Down Expand Up @@ -537,7 +537,7 @@ public Status Delete(Key key, Context userContext = default, long serialNo = 0)

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.DeleteAsyncResult<Input, Output, Context>> DeleteAsync(ref Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default)
public ValueTask<FasterKV<Key, Value>.DeleteAsyncResult<Input, Output, Context>> DeleteAsync(ref Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default)
=> fht.DeleteAsync<Input, Output, Context, InternalFasterSession>(this.FasterSession, ref key, userContext, serialNo, token);

/// <inheritdoc/>
Expand Down Expand Up @@ -802,7 +802,7 @@ public async ValueTask WaitForCommitAsync(CancellationToken token = default)
/// <param name="compactUntilAddress">Compact log until this address</param>
/// <param name="compactionType">Compaction type (whether we lookup records or scan log for liveness checking)</param>
/// <returns>Address until which compaction was done</returns>
public long Compact(long compactUntilAddress, CompactionType compactionType = CompactionType.Scan)
public long Compact(long compactUntilAddress, CompactionType compactionType = CompactionType.Scan)
=> Compact(compactUntilAddress, compactionType, default(DefaultCompactionFunctions<Key, Value>));

/// <summary>
Expand Down Expand Up @@ -830,7 +830,7 @@ public long Compact<CompactionFunctions>(long untilAddress, CompactionType compa
{
Input input = default;
Output output = default;
return fht.Compact<Input, Output, Context, Functions, CompactionFunctions>(functions, compactionFunctions, ref input, ref output, untilAddress, compactionType,
return fht.Compact<Input, Output, Context, Functions, CompactionFunctions>(functions, compactionFunctions, ref input, ref output, untilAddress, compactionType,
new SessionVariableLengthStructSettings<Value, Input> { valueLength = variableLengthStruct, inputLength = inputVariableLengthStruct });
}

Expand Down Expand Up @@ -942,7 +942,7 @@ void IClientSession.AtomicSwitch(long version)
/// </summary>
internal bool IsInPreparePhase()
{
return this.fht.SystemState.Phase == Phase.PREPARE;
return this.fht.SystemState.Phase == Phase.PREPARE || this.fht.SystemState.Phase == Phase.PREPARE_GROW;
}

#endregion Other Operations
Expand Down
33 changes: 32 additions & 1 deletion cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,37 @@ private FasterLog(FasterLogSettings logSettings, bool syncRecover, ILogger logge
}
}

/// <summary>
/// Initialize new log instance with specific begin address and (optional) last commit number
/// </summary>
/// <param name="beginAddress"></param>
/// <param name="lastCommitNum"></param>
public void Initialize(long beginAddress, long lastCommitNum = 0)
{
Debug.Assert(!readOnlyMode);

if (beginAddress == 0)
beginAddress = Constants.kFirstValidAddress;
badrishc marked this conversation as resolved.
Show resolved Hide resolved

try
{
allocator.RestoreHybridLog(beginAddress, beginAddress, beginAddress, beginAddress);
}
catch
{
if (!tolerateDeviceFailure) throw;
}

CommittedUntilAddress = beginAddress;
CommittedBeginAddress = beginAddress;
SafeTailAddress = beginAddress;

commitNum = lastCommitNum;
this.beginAddress = beginAddress;

if (lastCommitNum > 0) logCommitManager.OnRecovery(lastCommitNum);
}

/// <summary>
/// Recover FasterLog to the specific commit number, or latest if -1
/// </summary>
Expand Down Expand Up @@ -2168,7 +2199,7 @@ private void RestoreSpecificCommit(long requestedCommitNum, out Dictionary<strin

iterators = CompleteRestoreFromCommit(info);
cookie = info.Cookie;
commitNum = info.CommitNum;
commitNum = persistedCommitNum = info.CommitNum;
beginAddress = allocator.BeginAddress;
if (readOnlyMode)
allocator.HeadAddress = long.MaxValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,5 +484,10 @@ protected unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, in

pbuffer.Return();
}

/// <inheritdoc />
public void CheckpointVersionShift(long oldVersion, long newVersion)
{
}
}
}
Loading