Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

various changes from indexing work #562

Merged
merged 2 commits into from
Oct 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cs/src/core/Allocator/IFasterScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,15 @@ public interface IFasterScanIterator<Key, Value> : IDisposable
/// Next address
/// </summary>
long NextAddress { get; }

/// <summary>
/// The starting address of the scan
/// </summary>
long BeginAddress { get; }

/// <summary>
/// The ending address of the scan
/// </summary>
long EndAddress { get; }
}
}
7 changes: 6 additions & 1 deletion cs/src/core/Allocator/MemoryPageScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace FASTER.core
class MemoryPageScanIterator<Key, Value> : IFasterScanIterator<Key, Value>
{
readonly Record<Key, Value>[] page;
readonly int end;
readonly int start, end;
int offset;


Expand All @@ -25,13 +25,18 @@ public MemoryPageScanIterator(Record<Key, Value>[] page, int start, int end)
this.page = new Record<Key, Value>[page.Length];
Array.Copy(page, start, this.page, start, end - start);
offset = start - 1;
this.start = start;
this.end = end;
}

public long CurrentAddress => offset;

public long NextAddress => offset + 1;

public long BeginAddress => start;

public long EndAddress => end;

public void Dispose()
{
}
Expand Down
10 changes: 10 additions & 0 deletions cs/src/core/Allocator/ScanIteratorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ public abstract class ScanIteratorBase
/// </summary>
public long NextAddress => nextAddress;

/// <summary>
/// The starting address of the scan
/// </summary>
public long BeginAddress => beginAddress;

/// <summary>
/// The ending address of the scan
/// </summary>
public long EndAddress => endAddress;

/// <summary>
/// Constructor
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/WorkQueueLIFO.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void EnqueueAndTryWork(T work, bool asTask)

private void ProcessQueue()
{
// Process items in qork queue
// Process items in work queue
while (true)
{
while (_queue.TryPop(out var workItem))
Expand Down
3 changes: 1 addition & 2 deletions cs/src/core/Index/Common/CompletedOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

using System;
using System.Collections.Generic;

namespace FASTER.core
{
Expand Down Expand Up @@ -94,7 +93,7 @@ public struct CompletedOutput<TKey, TValue, TInput, TOutput, TContext>
public ref TInput Input => ref inputContainer.Get();

/// <summary>
/// The output for this pending operation.
/// The output for this pending operation. It is the caller's responsibility to dispose this if necessary; <see cref="Dispose()"/> will not try to dispose this member.
/// </summary>
public TOutput Output;

Expand Down
43 changes: 22 additions & 21 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ internal enum OperationType
READ,
RMW,
UPSERT,
INSERT,
DELETE
}

Expand Down Expand Up @@ -266,7 +265,7 @@ public struct HybridLogRecoveryInfo
/// </summary>
public int nextVersion;
/// <summary>
/// Flushed logical address
/// Flushed logical address; indicates the latest immutable address on the main FASTER log at recovery time.
/// </summary>
public long flushedLogicalAddress;
/// <summary>
Expand Down Expand Up @@ -588,37 +587,39 @@ public HybridLogCheckpointInfo Transfer()
}

public void Recover(Guid token, ICheckpointManager checkpointManager, int deltaLogPageSizeBits,
bool scanDelta, long recoverTo)
bool scanDelta = false, long recoverTo = -1)
{
deltaFileDevice = checkpointManager.GetDeltaLogDevice(token);
deltaFileDevice.Initialize(-1);
if (deltaFileDevice.GetFileSize(0) > 0)
if (deltaFileDevice is not null)
{
deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1);
deltaLog.InitializeForReads();
info.Recover(token, checkpointManager, deltaLog, scanDelta, recoverTo);
}
else
{
info.Recover(token, checkpointManager, null);
deltaFileDevice.Initialize(-1);
if (deltaFileDevice.GetFileSize(0) > 0)
{
deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1);
deltaLog.InitializeForReads();
info.Recover(token, checkpointManager, deltaLog, scanDelta, recoverTo);
return;
}
}
info.Recover(token, checkpointManager, null);
}

public void Recover(Guid token, ICheckpointManager checkpointManager, int deltaLogPageSizeBits,
out byte[] commitCookie, bool scanDelta = false, long recoverTo = -1)
{
deltaFileDevice = checkpointManager.GetDeltaLogDevice(token);
deltaFileDevice.Initialize(-1);
if (deltaFileDevice.GetFileSize(0) > 0)
if (deltaFileDevice is not null)
{
deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1);
deltaLog.InitializeForReads();
info.Recover(token, checkpointManager, out commitCookie, deltaLog, scanDelta, recoverTo);
}
else
{
info.Recover(token, checkpointManager, out commitCookie);
deltaFileDevice.Initialize(-1);
if (deltaFileDevice.GetFileSize(0) > 0)
{
deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1);
deltaLog.InitializeForReads();
info.Recover(token, checkpointManager, out commitCookie, deltaLog, scanDelta, recoverTo);
return;
}
}
info.Recover(token, checkpointManager, out commitCookie);
}

public bool IsDefault()
Expand Down
3 changes: 1 addition & 2 deletions cs/src/core/Index/Common/HeapContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ namespace FASTER.core
public interface IHeapContainer<T> : IDisposable
{
/// <summary>
/// Get object
/// Get a reference to the contained object
/// </summary>
/// <returns></returns>
ref T Get();
}

Expand Down
25 changes: 9 additions & 16 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ public partial class FasterKV<Key, Value> : FasterBase,
{
internal readonly AllocatorBase<Key, Value> hlog;
private readonly AllocatorBase<Key, Value> readcache;
private readonly IFasterEqualityComparer<Key> comparer;

/// <summary>
/// Compares two keys
/// </summary>
protected readonly IFasterEqualityComparer<Key> comparer;

internal readonly bool UseReadCache;
private readonly CopyReadsToTail CopyReadsToTail;
private readonly bool FoldOverSnapshot;
private readonly bool UseFoldOverCheckpoint;
internal readonly int sectorSize;
private readonly bool WriteDefaultOnDelete;
internal bool RelaxedCPR;
Expand Down Expand Up @@ -145,7 +149,7 @@ public FasterKV(long size, LogSettings logSettings,
if (checkpointSettings.CheckpointManager == null)
disposeCheckpointManager = true;

FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver;
UseFoldOverCheckpoint = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver;
CopyReadsToTail = logSettings.CopyReadsToTail;

if (logSettings.ReadCacheSettings != null)
Expand Down Expand Up @@ -244,7 +248,7 @@ public FasterKV(long size, LogSettings logSettings,
/// operation such as growing the index). Use CompleteCheckpointAsync to wait completion.
/// </returns>
public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1)
=> TakeFullCheckpoint(out token, this.FoldOverSnapshot ? CheckpointType.FoldOver : CheckpointType.Snapshot, targetVersion);
=> TakeFullCheckpoint(out token, this.UseFoldOverCheckpoint ? CheckpointType.FoldOver : CheckpointType.Snapshot, targetVersion);

/// <summary>
/// Initiate full checkpoint
Expand Down Expand Up @@ -353,17 +357,7 @@ public bool TakeIndexCheckpoint(out Guid token)
/// </param>
/// <returns>Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion.</returns>
public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1)
{
ISynchronizationTask backend;
if (FoldOverSnapshot)
backend = new FoldOverCheckpointTask();
else
backend = new SnapshotCheckpointTask();

var result = StartStateMachine(new HybridLogCheckpointStateMachine(backend, targetVersion));
token = _hybridLogCheckpointToken;
return result;
}
=> TakeHybridLogCheckpoint(out token, UseFoldOverCheckpoint ? CheckpointType.FoldOver : CheckpointType.Snapshot, tryIncremental: false, targetVersion);

/// <summary>
/// Initiate log-only checkpoint
Expand Down Expand Up @@ -631,7 +625,6 @@ internal Status ContextUpsert<Input, Output, Context, FasterSession>(ref Key key
while (internalStatus == OperationStatus.RETRY_NOW);

Status status;

if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND)
{
status = (Status)internalStatus;
Expand Down
14 changes: 8 additions & 6 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ internal OperationStatus InternalUpsert<Input, Output, Context, FasterSession>(
goto CreateNewRecord;
}

#region Entry latch operation
#region Entry latch operation
if (sessionCtx.phase != Phase.REST)
{
latchDestination = AcquireLatchUpsert(sessionCtx, bucket, ref status, ref latchOperation, ref entry, logicalAddress);
Expand Down Expand Up @@ -717,8 +717,8 @@ internal OperationStatus InternalRMW<Input, Output, Context, FasterSession>(
{
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
if (!recordInfo.Tombstone)
{
if (FoldOverSnapshot)
{
if (UseFoldOverCheckpoint)
{
Debug.Assert(recordInfo.Version == sessionCtx.version);
}
Expand Down Expand Up @@ -780,9 +780,9 @@ internal OperationStatus InternalRMW<Input, Output, Context, FasterSession>(
}
}

#endregion
#endregion

#region Create new record
#region Create new record
CreateNewRecord:
if (latchDestination != LatchDestination.CreatePendingContext)
{
Expand Down Expand Up @@ -1402,7 +1402,9 @@ internal void InternalContinuePendingReadCopyToTail<Input, Output, Context, Fast
{
Debug.Assert(RelaxedCPR || pendingContext.version == opCtx.version);

// If NoKey, we do not have the key in the initial call and must use the key from the satisfied request.
ref Key key = ref pendingContext.NoKey ? ref hlog.GetContextRecordKey(ref request) : ref pendingContext.key.Get();

byte* physicalAddress = request.record.GetValidPointer();
long logicalAddress = pendingContext.entry.Address;
ref RecordInfo oldRecordInfo = ref hlog.GetInfoFromBytePointer(physicalAddress);
Expand Down Expand Up @@ -2007,7 +2009,7 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes

#endregion

#region Split Index
#region Split Index
private void SplitBuckets(long hash)
{
long masked_bucket_index = hash & state[1 - resizeInfo.version].size_mask;
Expand Down
4 changes: 4 additions & 0 deletions cs/src/core/Index/FASTER/FASTERIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public FasterKVIterator(FasterKV<Key, Value> fht, Functions functions, long unti

public long NextAddress => enumerationPhase == 0 ? iter1.NextAddress : iter2.NextAddress;

public long BeginAddress => enumerationPhase == 0 ? iter1.BeginAddress : iter2.BeginAddress;

public long EndAddress => enumerationPhase == 0 ? iter1.EndAddress : iter2.EndAddress;

public void Dispose()
{
iter1?.Dispose();
Expand Down
5 changes: 4 additions & 1 deletion cs/src/core/Index/Interfaces/IFasterSession.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
namespace FASTER.core
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

namespace FASTER.core
{
/// <summary>
/// Provides thread management and callback to checkpoint completion (called state machine).
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/Recovery/ICheckpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public interface ICheckpointManager : IDisposable
/// <param name="scanDelta"> whether or not to scan through the delta log to acquire latest entry</param>
/// <param name="recoverTo"> version upper bound to scan for in the delta log. Function will return the largest version metadata no greater than the given version.</param>
/// <returns>Metadata, or null if invalid</returns>
byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo);
byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta = false, long recoverTo = -1);

/// <summary>
/// Get list of index checkpoint tokens, in order of usage preference
Expand Down
2 changes: 2 additions & 0 deletions cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck
// Recover session information
hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress);
_recoveredSessions = recoveredHLCInfo.info.continueTokens;

checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid);
recoveredHLCInfo.Dispose();
}
Expand Down Expand Up @@ -348,6 +349,7 @@ await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogical
hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress);
_recoveredSessions = recoveredHLCInfo.info.continueTokens;

checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid);
recoveredHLCInfo.Dispose();
}

Expand Down
Loading