Skip to content

Commit

Permalink
Allow checkpoint type to be chosen per instantiation (#285)
Browse files Browse the repository at this point in the history
* Allow checkpoint type to be chosen per instantiation
* Add custom checkpoint overloads to legacy API as well.
  • Loading branch information
badrishc committed Jul 13, 2020
1 parent dc38f04 commit cf0f199
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 23 deletions.
26 changes: 12 additions & 14 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,6 @@ internal void Recover(Guid token, ICheckpointManager checkpointManager)
Initialize(s);
}

/// <summary>
/// Reset
/// </summary>
public void Reset()
{
Initialize(default, -1);
}

/// <summary>
/// Write info to byte array
/// </summary>
Expand Down Expand Up @@ -374,29 +366,30 @@ internal struct HybridLogCheckpointInfo
public IDevice snapshotFileDevice;
public IDevice snapshotFileObjectLogDevice;
public SemaphoreSlim flushedSemaphore;
public long started;

public void Initialize(Guid token, int _version, ICheckpointManager checkpointManager)
{
info.Initialize(token, _version);
started = 0;
checkpointManager.InitializeLogCheckpoint(token);
}

public void Recover(Guid token, ICheckpointManager checkpointManager)
{
info.Recover(token, checkpointManager);
started = 0;
}

public void Reset()
{
started = 0;
flushedSemaphore = null;
info.Reset();
info = default;
if (snapshotFileDevice != null) snapshotFileDevice.Close();
if (snapshotFileObjectLogDevice != null) snapshotFileObjectLogDevice.Close();
}

public bool IsDefault()
{
return info.guid == default;
}
}

internal struct IndexRecoveryInfo
Expand Down Expand Up @@ -514,8 +507,13 @@ public void Recover(Guid token, ICheckpointManager checkpointManager)

public void Reset()
{
info.Reset();
info = default;
main_ht_device.Close();
}

public bool IsDefault()
{
return info.token == default;
}
}
}
48 changes: 48 additions & 0 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,32 @@ public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1)
return result;
}

/// <summary>
/// Initiate full checkpoint
/// </summary>
/// <param name="token">Checkpoint token</param>
/// <param name="checkpointType">Checkpoint type</param>
/// <param name="targetVersion">upper limit (inclusive) of the version included</param>
/// <returns>
/// Whether we successfully initiated the checkpoint (initiation may
/// fail if we are already taking a checkpoint or performing some other
/// operation such as growing the index).
/// </returns>
public bool TakeFullCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1)
{
ISynchronizationTask backend;
if (checkpointType == CheckpointType.FoldOver)
backend = new FoldOverCheckpointTask();
else if (checkpointType == CheckpointType.Snapshot)
backend = new SnapshotCheckpointTask();
else
throw new FasterException("Unsupported full checkpoint type");

var result = StartStateMachine(new FullCheckpointStateMachine(backend, targetVersion));
token = _hybridLogCheckpointToken;
return result;
}

/// <summary>
/// Initiate index checkpoint
/// </summary>
Expand Down Expand Up @@ -235,6 +261,28 @@ public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1)
return result;
}

/// <summary>
/// Take incremental hybrid log checkpoint
/// </summary>
/// <param name="token">Checkpoint token</param>
/// <param name="checkpointType">Checkpoint type</param>
/// <param name="targetVersion">upper limit (inclusive) of the version included</param>
/// <returns>Whether we could initiate the checkpoint</returns>
public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1)
{
ISynchronizationTask backend;
if (checkpointType == CheckpointType.FoldOver)
backend = new FoldOverCheckpointTask();
else if (checkpointType == CheckpointType.Snapshot)
backend = new SnapshotCheckpointTask();
else
throw new FasterException("Unsupported checkpoint type");

var result = StartStateMachine(new HybridLogCheckpointStateMachine(backend, targetVersion));
token = _hybridLogCheckpointToken;
return result;
}

/// <summary>
/// Recover from the latest checkpoints
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions cs/src/core/Index/FASTER/FASTERLegacy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,18 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSessio
/// <inheritdoc />
public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1) => _fasterKV.TakeFullCheckpoint(out token, targetVersion);

/// <inheritdoc />
public bool TakeFullCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1) => _fasterKV.TakeFullCheckpoint(out token, checkpointType, targetVersion);

/// <inheritdoc />
public bool TakeIndexCheckpoint(out Guid token) => _fasterKV.TakeIndexCheckpoint(out token);

/// <inheritdoc />
public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1) => _fasterKV.TakeHybridLogCheckpoint(out token, targetVersion);

/// <inheritdoc />
public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1) => _fasterKV.TakeHybridLogCheckpoint(out token, checkpointType, targetVersion);

/// <inheritdoc />
public void Recover() => _fasterKV.Recover();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public void GlobalBeforeEnteringState<Key, Value>(
switch (next.phase)
{
case Phase.PREP_INDEX_CHECKPOINT:
Debug.Assert(faster._indexCheckpointToken == default &&
faster._hybridLogCheckpointToken == default);
Debug.Assert(faster._indexCheckpoint.IsDefault() &&
faster._hybridLogCheckpoint.IsDefault());
var fullCheckpointToken = Guid.NewGuid();
faster._indexCheckpointToken = fullCheckpointToken;
faster._hybridLogCheckpointToken = fullCheckpointToken;
Expand All @@ -39,7 +39,7 @@ public void GlobalBeforeEnteringState<Key, Value>(
break;
case Phase.PERSISTENCE_CALLBACK:
faster.WriteIndexMetaInfo();
faster._indexCheckpointToken = default;
faster._indexCheckpoint.Reset();
break;
}
}
Expand Down
3 changes: 1 addition & 2 deletions cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public virtual void GlobalBeforeEnteringState<Key, Value>(SystemState next,
switch (next.phase)
{
case Phase.PREPARE:
if (faster._hybridLogCheckpointToken == default)
if (faster._hybridLogCheckpoint.IsDefault())
{
faster._hybridLogCheckpointToken = Guid.NewGuid();
faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.version);
Expand Down Expand Up @@ -52,7 +52,6 @@ public virtual void GlobalBeforeEnteringState<Key, Value>(SystemState next,
faster.WriteHybridLogMetaInfo();
break;
case Phase.REST:
faster._hybridLogCheckpointToken = default;
faster._hybridLogCheckpoint.Reset();
var nextTcs = new TaskCompletionSource<LinkedCheckpointInfo>(TaskCreationOptions.RunContinuationsAsynchronously);
faster.checkpointTcs.SetResult(new LinkedCheckpointInfo { NextTask = nextTcs.Task });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void GlobalBeforeEnteringState<Key, Value>(
switch (next.phase)
{
case Phase.PREP_INDEX_CHECKPOINT:
if (faster._indexCheckpointToken == default)
if (faster._indexCheckpoint.IsDefault())
{
faster._indexCheckpointToken = Guid.NewGuid();
faster.InitializeIndexCheckpoint(faster._indexCheckpointToken);
Expand All @@ -40,12 +40,11 @@ public void GlobalBeforeEnteringState<Key, Value>(
// the tail address.
if (faster.ObtainCurrentTailAddress(ref faster._indexCheckpoint.info.finalLogicalAddress))
faster._indexCheckpoint.info.num_buckets = faster.overflowBucketsAllocator.GetMaxValidAddress();
if (faster._indexCheckpointToken != default)
if (!faster._indexCheckpoint.IsDefault())
{
faster.WriteIndexMetaInfo();
faster._indexCheckpointToken = default;
faster._indexCheckpoint.Reset();
}
faster._indexCheckpoint.Reset();

break;
}
Expand Down

0 comments on commit cf0f199

Please sign in to comment.