Skip to content

Commit

Permalink
Fixed GrowIndex to work correctly (#395)
Browse files Browse the repository at this point in the history
* Fixed GrowIndex to work correctly, and return only after index size doubling is complete.
* Reset overflowBucketsAllocator during resize.
* Add API to get number of overflow buckets.
  • Loading branch information
badrishc committed Feb 1, 2021
1 parent 988907f commit aef2bbe
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 37 deletions.
42 changes: 39 additions & 3 deletions cs/src/core/Index/FASTER/FASTER.cs
Expand Up @@ -58,6 +58,11 @@ public partial class FasterKV<Key, Value> : FasterBase,
/// </summary>
public long IndexSize => state[resizeInfo.version].size;

/// <summary>
/// Number of overflow buckets in use (64 bytes each)
/// </summary>
public long OverflowBucketCount => overflowBucketsAllocator.GetMaxValidAddress();

/// <summary>
/// Comparer used by FASTER
/// </summary>
Expand Down Expand Up @@ -656,12 +661,43 @@ public async ValueTask CompleteCheckpointAsync(CancellationToken token = default
}

/// <summary>
/// Grow the hash index
/// Grow the hash index by a factor of two. Make sure to take a full checkpoint
/// after growth, for persistence.
/// </summary>
/// <returns>Whether the request succeeded</returns>
/// <returns>Whether the grow completed</returns>
public bool GrowIndex()
{
return StartStateMachine(new IndexResizeStateMachine());
if (LightEpoch.AnyInstanceProtected())
throw new FasterException("Cannot use GrowIndex when using legacy or non-async sessions");

if (!StartStateMachine(new IndexResizeStateMachine())) return false;

epoch.Resume();

try
{
while (true)
{
SystemState _systemState = SystemState.Copy(ref systemState);
if (_systemState.phase == Phase.IN_PROGRESS_GROW)
{
SplitBuckets(0);
epoch.ProtectAndDrain();
}
else
{
SystemState.RemoveIntermediate(ref _systemState);
if (_systemState.phase != Phase.PREPARE_GROW && _systemState.phase != Phase.IN_PROGRESS_GROW)
{
return true;
}
}
}
}
finally
{
epoch.Suspend();
}
}

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Index/FASTER/FASTERBase.cs
Expand Up @@ -244,7 +244,8 @@ public unsafe partial class FasterBase
internal long minTableSize = 16;

// Allocator for the hash buckets
internal readonly MallocFixedPageSize<HashBucket> overflowBucketsAllocator;
internal MallocFixedPageSize<HashBucket> overflowBucketsAllocator;
internal MallocFixedPageSize<HashBucket> overflowBucketsAllocatorResize;

// An array of size two, that contains the old and new versions of the hash-table
internal InternalHashTable[] state = new InternalHashTable[2];
Expand Down
94 changes: 66 additions & 28 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Expand Up @@ -1744,18 +1744,21 @@ private void ReleaseSharedLatch(Key key)
foundPhysicalAddress = Constants.kInvalidAddress;
return false;
}
#endregion
#endregion

#region Split Index
#region Split Index
private void SplitBuckets(long hash)
{
long masked_bucket_index = hash & state[1 - resizeInfo.version].size_mask;
int offset = (int)(masked_bucket_index >> Constants.kSizeofChunkBits);
SplitBuckets(offset);
}

private void SplitBuckets(int offset)
{
int numChunks = (int)(state[1 - resizeInfo.version].size / Constants.kSizeofChunk);
if (numChunks == 0) numChunks = 1; // at least one chunk


if (!Utility.IsPowerOfTwo(numChunks))
{
throw new FasterException("Invalid number of chunks: " + numChunks);
Expand All @@ -1780,6 +1783,8 @@ private void SplitBuckets(long hash)
{
// GC old version of hash table
state[1 - resizeInfo.version] = default;
overflowBucketsAllocatorResize.Dispose();
overflowBucketsAllocatorResize = null;
GlobalStateMachineStep(systemState);
return;
}
Expand All @@ -1789,7 +1794,7 @@ private void SplitBuckets(long hash)

while (Interlocked.Read(ref splitStatus[offset & (numChunks - 1)]) == 1)
{

Thread.Yield();
}

}
Expand Down Expand Up @@ -1821,17 +1826,26 @@ private void SplitBuckets(long hash)
}

var logicalAddress = entry.Address;
if (logicalAddress >= hlog.HeadAddress)
long physicalAddress = 0;

if (entry.ReadCache && (entry.Address & ~Constants.kReadCacheBitMask) >= readcache.HeadAddress)
physicalAddress = readcache.GetPhysicalAddress(entry.Address & ~Constants.kReadCacheBitMask);
else if (logicalAddress >= hlog.HeadAddress)
physicalAddress = hlog.GetPhysicalAddress(logicalAddress);

// It is safe to always use hlog instead of readcache for some calls such
// as GetKey and GetInfo
if (physicalAddress != 0)
{
var physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
var hash = comparer.GetHashCode64(ref hlog.GetKey(physicalAddress));
if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == 0)
{
// Insert in left
if (left == left_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*left = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*left = new_bucket_logical;
left = (long*)new_bucket;
left_end = left + Constants.kOverflowBucketIndex;
}
Expand All @@ -1841,12 +1855,13 @@ private void SplitBuckets(long hash)

// Insert previous address in right
entry.Address = TraceBackForOtherChainStart(hlog.GetInfo(physicalAddress).PreviousAddress, 1);
if (entry.Address != Constants.kInvalidAddress)
if ((entry.Address != Constants.kInvalidAddress) && (entry.Address != Constants.kTempInvalidAddress))
{
if (right == right_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*right = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*right = new_bucket_logical;
right = (long*)new_bucket;
right_end = right + Constants.kOverflowBucketIndex;
}
Expand All @@ -1860,8 +1875,9 @@ private void SplitBuckets(long hash)
// Insert in right
if (right == right_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*right = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*right = new_bucket_logical;
right = (long*)new_bucket;
right_end = right + Constants.kOverflowBucketIndex;
}
Expand All @@ -1871,12 +1887,13 @@ private void SplitBuckets(long hash)

// Insert previous address in left
entry.Address = TraceBackForOtherChainStart(hlog.GetInfo(physicalAddress).PreviousAddress, 0);
if (entry.Address != Constants.kInvalidAddress)
if ((entry.Address != Constants.kInvalidAddress) && (entry.Address != Constants.kTempInvalidAddress))
{
if (left == left_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*left = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*left = new_bucket_logical;
left = (long*)new_bucket;
left_end = left + Constants.kOverflowBucketIndex;
}
Expand All @@ -1893,8 +1910,9 @@ private void SplitBuckets(long hash)
// Insert in left
if (left == left_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*left = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*left = new_bucket_logical;
left = (long*)new_bucket;
left_end = left + Constants.kOverflowBucketIndex;
}
Expand All @@ -1905,8 +1923,9 @@ private void SplitBuckets(long hash)
// Insert in right
if (right == right_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*right = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*right = new_bucket_logical;
right = (long*)new_bucket;
right_end = right + Constants.kOverflowBucketIndex;
}
Expand All @@ -1917,22 +1936,41 @@ private void SplitBuckets(long hash)
}

if (*(((long*)src_start) + Constants.kOverflowBucketIndex) == 0) break;
src_start = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(*(((long*)src_start) + Constants.kOverflowBucketIndex));
src_start = (HashBucket*)overflowBucketsAllocatorResize.GetPhysicalAddress(*(((long*)src_start) + Constants.kOverflowBucketIndex));
} while (true);
}
}

private long TraceBackForOtherChainStart(long logicalAddress, int bit)
{
while (logicalAddress >= hlog.HeadAddress)
while (true)
{
var physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
var hash = comparer.GetHashCode64(ref hlog.GetKey(physicalAddress));
if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == bit)
HashBucketEntry entry = default;
entry.Address = logicalAddress;
if (entry.ReadCache)
{
return logicalAddress;
if (logicalAddress < readcache.HeadAddress)
break;
var physicalAddress = readcache.GetPhysicalAddress(logicalAddress);
var hash = comparer.GetHashCode64(ref readcache.GetKey(physicalAddress));
if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == bit)
{
return logicalAddress;
}
logicalAddress = readcache.GetInfo(physicalAddress).PreviousAddress;
}
else
{
if (logicalAddress < hlog.HeadAddress)
break;
var physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
var hash = comparer.GetHashCode64(ref hlog.GetKey(physicalAddress));
if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == bit)
{
return logicalAddress;
}
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
}
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
}
return logicalAddress;
}
Expand Down Expand Up @@ -2087,7 +2125,7 @@ private long GetLatestRecordVersion(ref HashBucketEntry entry, long defaultVersi
if (UseReadCache && entry.ReadCache)
{
var _addr = readcache.GetPhysicalAddress(entry.Address & ~Constants.kReadCacheBitMask);
if (entry.Address >= readcache.HeadAddress)
if ((entry.Address & ~Constants.kReadCacheBitMask) >= readcache.HeadAddress)
return readcache.GetInfo(_addr).Version;
else
return defaultVersion;
Expand Down
13 changes: 9 additions & 4 deletions cs/src/core/Index/Recovery/IndexRecovery.cs
Expand Up @@ -39,15 +39,20 @@ private uint InitializeMainIndexRecovery(ref IndexCheckpointInfo info, bool isAs
var token = info.info.token;
var ht_version = resizeInfo.version;

if (state[ht_version].size != info.info.table_size)
throw new FasterException($"Incompatible hash table size during recovery; allocated {state[ht_version].size} buckets, recovering {info.info.table_size} buckets");

// Create devices to read from using Async API
info.main_ht_device = checkpointManager.GetIndexDevice(token);
var sectorSize = info.main_ht_device.SectorSize;

if (state[ht_version].size != info.info.table_size)
{
Free(ht_version);
Initialize(info.info.table_size, (int)sectorSize);
}


BeginMainIndexRecovery(ht_version, info.main_ht_device, info.info.num_ht_bytes, isAsync);

var sectorSize = info.main_ht_device.SectorSize;

var alignedIndexSize = (uint)((info.info.num_ht_bytes + (sectorSize - 1)) & ~(sectorSize - 1));
return alignedIndexSize;
}
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs
Expand Up @@ -26,7 +26,8 @@ internal sealed class IndexResizeTask : ISynchronizationTask

faster.numPendingChunksToBeSplit = numChunks;
faster.splitStatus = new long[numChunks];

faster.overflowBucketsAllocatorResize = faster.overflowBucketsAllocator;
faster.overflowBucketsAllocator = new MallocFixedPageSize<HashBucket>(false);
faster.Initialize(1 - faster.resizeInfo.version, faster.state[faster.resizeInfo.version].size * 2, faster.sectorSize);

faster.resizeInfo.version = 1 - faster.resizeInfo.version;
Expand Down
73 changes: 73 additions & 0 deletions cs/test/RecoveryChecks.cs
Expand Up @@ -309,5 +309,78 @@ public async ValueTask RecoveryCheck4([Values] CheckpointType checkpointType, [V
s2.CompletePending(true);
}
}

[Test]
public async ValueTask RecoveryCheck5([Values] CheckpointType checkpointType, [Values] bool isAsync, [Values] bool useReadCache, [Values(128, 1 << 10)] int size)
{
using var fht1 = new FasterKV<long, long>
(size,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 14, ReadCacheSettings = useReadCache ? new ReadCacheSettings() : null },
checkpointSettings: new CheckpointSettings { CheckpointDir = path }
);

using var s1 = fht1.NewSession(new MyFunctions());
for (long key = 0; key < 1000; key++)
{
s1.Upsert(ref key, ref key);
}

if (useReadCache)
{
fht1.Log.FlushAndEvict(true);
for (long key = 0; key < 1000; key++)
{
long output = default;
var status = s1.Read(ref key, ref output);
if (status != Status.PENDING)
Assert.IsTrue(status == Status.OK && output == key);
}
s1.CompletePending(true);
}

fht1.GrowIndex();

for (long key = 0; key < 1000; key++)
{
long output = default;
var status = s1.Read(ref key, ref output);
if (status != Status.PENDING)
Assert.IsTrue(status == Status.OK && output == key);
}
s1.CompletePending(true);

var task = fht1.TakeFullCheckpointAsync(checkpointType);

using var fht2 = new FasterKV<long, long>
(size,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20, ReadCacheSettings = useReadCache ? new ReadCacheSettings() : null },
checkpointSettings: new CheckpointSettings { CheckpointDir = path }
);

if (isAsync)
{
await task;
await fht2.RecoverAsync();
}
else
{
task.GetAwaiter().GetResult();
fht2.Recover();
}

Assert.IsTrue(fht1.Log.HeadAddress == fht2.Log.HeadAddress);
Assert.IsTrue(fht1.Log.ReadOnlyAddress == fht2.Log.ReadOnlyAddress);
Assert.IsTrue(fht1.Log.TailAddress == fht2.Log.TailAddress);

using var s2 = fht2.NewSession(new MyFunctions());
for (long key = 0; key < 1000; key++)
{
long output = default;
var status = s2.Read(ref key, ref output);
if (status != Status.PENDING)
Assert.IsTrue(status == Status.OK && output == key);
}
s2.CompletePending(true);
}
}
}

0 comments on commit aef2bbe

Please sign in to comment.