Skip to content

Commit

Permalink
[C#] Fix data races between copy to tail and concurrent upserts (#501)
Browse files Browse the repository at this point in the history
* [bug fix]
copy from read-only region to tail now won't race with upserts

* use InternalTryCopyToTail to handle copy to tail in general

* add code example to preserve recordInfo in compact

* fix bugs in previous commit

* update comments
InternalTryCopyToTail takes two sessions for copy from disk to tail

* indent

* fix bugs
copy to tail now auto retries due to failed CAS
compact now doesn't copy to read cache

* optimize InternalCopyToTail
retry without find entry again.

* temp k->addr in Compact

* take back try copy to tail

* temporarily comment out assert for passing check

* clean up comment
move logical address check in Compact to debug

* address comments suggested by Ted
remove debug code in compact and CopyToTail
add comments

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
qtcwt and badrishc committed Jul 21, 2021
1 parent 5022cd6 commit 2a2bf2f
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 95 deletions.
2 changes: 1 addition & 1 deletion cs/src/core/ClientSession/AdvancedClientSession.cs
Expand Up @@ -703,7 +703,7 @@ public Status Delete(Key key, Context userContext = default, long serialNo = 0)
/// Experimental feature
/// Checks whether specified record is present in memory
/// (between HeadAddress and tail, or between fromAddress
/// and tail)
/// and tail), including tombstones.
/// </summary>
/// <param name="key">Key of the record.</param>
/// <param name="logicalAddress">Logical address of record, if found</param>
Expand Down
25 changes: 24 additions & 1 deletion cs/src/core/ClientSession/ClientSession.cs
Expand Up @@ -663,7 +663,7 @@ public Status Delete(Key key, Context userContext = default, long serialNo = 0)
/// Experimental feature
/// Checks whether specified record is present in memory
/// (between HeadAddress and tail, or between fromAddress
/// and tail)
/// and tail), including tombstones.
/// </summary>
/// <param name="key">Key of the record.</param>
/// <param name="logicalAddress">Logical address of record, if found</param>
Expand Down Expand Up @@ -890,6 +890,29 @@ public long Compact<CompactionFunctions>(long untilAddress, bool shiftBeginAddre
return fht.Log.Compact<Input, Output, Context, Functions, CompactionFunctions>(functions, compactionFunctions, untilAddress, shiftBeginAddress);
}

/// <summary>
/// Insert key and value with the record info preserved.
/// Succeed only if logical address of the key isn't greater than foundLogicalAddress; otherwise give up and return.
/// </summary>
/// <param name="key"></param>
/// <param name="desiredValue"></param>
/// <param name="recordInfo"></param>
/// <param name="foundLogicalAddress"></param>
/// <param name="noReadCache"></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void CopyToTail(ref Key key, ref Value desiredValue, ref RecordInfo recordInfo, long foundLogicalAddress)
{
if (SupportAsync) UnsafeResumeThread();
try
{
fht.InternalCopyToTail(ref key, ref desiredValue, ref recordInfo, foundLogicalAddress, FasterSession, ctx, noReadCache: true);
}
finally
{
if (SupportAsync) UnsafeSuspendThread();
}
}

/// <summary>
/// Iterator for all (distinct) live key-values stored in FASTER
/// </summary>
Expand Down
263 changes: 172 additions & 91 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Expand Up @@ -191,7 +191,7 @@ internal enum LatchOperation : byte
if (CopyReadsToTail == CopyReadsToTail.FromReadOnly)
{
var container = hlog.GetValueContainer(ref hlog.GetValue(physicalAddress));
InternalUpsert(ref key, ref container.Get(), ref userContext, ref pendingContext, fasterSession, sessionCtx, lsn);
InternalTryCopyToTail(ref key, ref container.Get(), ref pendingContext.recordInfo, logicalAddress, fasterSession, sessionCtx);
container.Dispose();
}
return OperationStatus.SUCCESS;
Expand Down Expand Up @@ -1369,97 +1369,12 @@ private enum LatchDestination
{
Debug.Assert(RelaxedCPR || pendingContext.version == opCtx.version);

var bucket = default(HashBucket*);
var slot = default(int);
var logicalAddress = Constants.kInvalidAddress;
var physicalAddress = default(long);

// If NoKey, we do not have the key in the initial call and must use the key from the satisfied request.
ref Key key = ref pendingContext.NoKey ? ref hlog.GetContextRecordKey(ref request) : ref pendingContext.key.Get();

var hash = comparer.GetHashCode64(ref key);

var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);

#region Trace back record in in-memory HybridLog
var entry = default(HashBucketEntry);
FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
logicalAddress = entry.word & Constants.kAddressMask;

if (UseReadCache)
SkipReadCache(ref logicalAddress);
var latestLogicalAddress = logicalAddress;

if (logicalAddress >= hlog.HeadAddress)
{
physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
{
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
TraceBackForKeyMatch(ref key,
logicalAddress,
hlog.HeadAddress,
out logicalAddress,
out physicalAddress);
}
}
#endregion

if (logicalAddress > pendingContext.entry.Address)
{
// Give up early
return;
}

#region Create new copy in mutable region
physicalAddress = (long)request.record.GetValidPointer();
var (actualSize, allocatedSize) = hlog.GetRecordSize(physicalAddress);

long newLogicalAddress, newPhysicalAddress;
if (UseReadCache)
{
BlockAllocateReadCache(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
newPhysicalAddress = readcache.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref readcache.GetInfo(newPhysicalAddress), opCtx.version,
tombstone:false, invalidBit:false,
entry.Address);
readcache.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key,
ref hlog.GetContextRecordValue(ref request),
ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize));
}
else
{
BlockAllocate(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version,
tombstone:false, invalidBit:false,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key,
ref hlog.GetContextRecordValue(ref request),
ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize));
}


var updatedEntry = default(HashBucketEntry);
updatedEntry.Tag = tag;
updatedEntry.Address = newLogicalAddress & Constants.kAddressMask;
updatedEntry.Pending = entry.Pending;
updatedEntry.Tentative = false;
updatedEntry.ReadCache = UseReadCache;

var foundEntry = default(HashBucketEntry);
foundEntry.word = Interlocked.CompareExchange(
ref bucket->bucket_entries[slot],
updatedEntry.word,
entry.word);
if (foundEntry.word != entry.word)
{
if (!UseReadCache) hlog.GetInfo(newPhysicalAddress).Invalid = true;
// We don't retry, just give up
}
#endregion
byte* physicalAddress = request.record.GetValidPointer();
long logicalAddress = pendingContext.entry.Address;
ref RecordInfo oldRecordInfo = ref hlog.GetInfoFromBytePointer(physicalAddress);

InternalTryCopyToTail(opCtx, ref key, ref hlog.GetContextRecordValue(ref request), ref oldRecordInfo, logicalAddress, fasterSession, currentCtx);
}

/// <summary>
Expand Down Expand Up @@ -1890,6 +1805,172 @@ private void ReleaseSharedLatch(Key key)
foundPhysicalAddress = Constants.kInvalidAddress;
return false;
}


[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal OperationStatus InternalCopyToTail<Input, Output, Context, FasterSession>(
ref Key key, ref Value value,
ref RecordInfo recordInfo,
long expectedLogicalAddress,
FasterSession fasterSession,
FasterExecutionContext<Input, Output, Context> currentCtx,
bool noReadCache = false)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
OperationStatus internalStatus;
do
internalStatus = InternalTryCopyToTail(currentCtx, ref key, ref value, ref recordInfo, expectedLogicalAddress, fasterSession, currentCtx, noReadCache);
while (internalStatus == OperationStatus.RETRY_NOW);
return internalStatus;
}


[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSession>(
ref Key key, ref Value value,
ref RecordInfo recordInfo,
long foundLogicalAddress,
FasterSession fasterSession,
FasterExecutionContext<Input, Output, Context> currentCtx,
bool noReadCache = false)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
=> InternalTryCopyToTail(currentCtx, ref key, ref value, ref recordInfo, foundLogicalAddress, fasterSession, currentCtx, noReadCache);

/// <summary>
/// Helper function for trying to copy existing immutable records (at foundLogicalAddress) to the tail,
/// used in <see cref="InternalRead{Input, Output, Context, Functions}(ref Key, ref Input, ref Output, long, ref Context, ref PendingContext{Input, Output, Context}, Functions, FasterExecutionContext{Input, Output, Context}, long)"/>
/// <see cref="InternalContinuePendingReadCopyToTail{Input, Output, Context, FasterSession}(FasterExecutionContext{Input, Output, Context}, AsyncIOContext{Key, Value}, ref PendingContext{Input, Output, Context}, FasterSession, FasterExecutionContext{Input, Output, Context})"/>,
/// and <see cref="ClientSession{Key, Value, Input, Output, Context, Functions}.CopyToTail(ref Key, ref Value, ref RecordInfo, long, bool)"/>
///
/// Succeed only if the record for the same key hasn't changed.
/// </summary>
/// <typeparam name="Input"></typeparam>
/// <typeparam name="Output"></typeparam>
/// <typeparam name="Context"></typeparam>
/// <typeparam name="FasterSession"></typeparam>
/// <param name="opCtx">
/// The thread(or session) context to execute operation in.
/// It's different from currentCtx only when the function is used in InternalContinuePendingReadCopyToTail
/// </param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="recordInfo"></param>
/// <param name="expectedLogicalAddress">
/// The expected address of the record being copied.
/// </param>
/// <param name="fasterSession"></param>
/// <param name="currentCtx"></param>
/// <param name="noReadCache">
/// If true, it won't clutter read cache.
/// Otherwise, it still checks UseReadCache to determine whether to buffer in read cache.
/// It is useful in Compact.
/// </param>
/// <returns>
/// NOTFOUND: didn't find the expected record of the same key that isn't greater than expectedLogicalAddress
/// RETRY_NOW: failed.
/// SUCCESS:
/// </returns>
internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSession>(
FasterExecutionContext<Input, Output, Context> opCtx,
ref Key key, ref Value value,
ref RecordInfo recordInfo,
long expectedLogicalAddress,
FasterSession fasterSession,
FasterExecutionContext<Input, Output, Context> currentCtx,
bool noReadCache = false)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
Debug.Assert(expectedLogicalAddress >= hlog.BeginAddress);
var bucket = default(HashBucket*);
var slot = default(int);

var hash = comparer.GetHashCode64(ref key);
var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);

var entry = default(HashBucketEntry);
FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
var logicalAddress = entry.Address;
var physicalAddress = default(long);
if (UseReadCache)
SkipReadCache(ref logicalAddress);
var latestLogicalAddress = logicalAddress;

if (logicalAddress >= hlog.HeadAddress)
{
physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
{
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
TraceBackForKeyMatch(ref key,
logicalAddress,
hlog.HeadAddress,
out logicalAddress,
out physicalAddress);
}
}

if (logicalAddress > expectedLogicalAddress || logicalAddress < hlog.BeginAddress)
{
// We give up early.
// Note: In Compact, expectedLogicalAddress may not exactly match the source of this copy operation,
// but instead only an upper bound.
return OperationStatus.NOTFOUND;
}
#region Create new copy in mutable region
var (actualSize, allocatedSize) = hlog.GetRecordSize(ref key, ref value);

long newLogicalAddress, newPhysicalAddress;
bool copyToReadCache = noReadCache ? false : UseReadCache;
if (copyToReadCache)
{
BlockAllocateReadCache(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
newPhysicalAddress = readcache.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref readcache.GetInfo(newPhysicalAddress), opCtx.version,
tombstone: false, invalidBit: false,
entry.Address);
readcache.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key,
ref value,
ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize));
}
else
{
BlockAllocate(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version,
tombstone: false, invalidBit: false,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key,
ref value,
ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize));
}


var updatedEntry = default(HashBucketEntry);
updatedEntry.Tag = tag;
updatedEntry.Address = newLogicalAddress & Constants.kAddressMask;
updatedEntry.Pending = entry.Pending;
updatedEntry.Tentative = false;
updatedEntry.ReadCache = copyToReadCache;

var foundEntry = default(HashBucketEntry);
foundEntry.word = Interlocked.CompareExchange(
ref bucket->bucket_entries[slot],
updatedEntry.word,
entry.word);
if (foundEntry.word != entry.word)
{
if (!copyToReadCache) hlog.GetInfo(newPhysicalAddress).Invalid = true;
// Note: only Compact actually retries;
// other operations, i.e., copy to tail during reads, just give up if the first try fails
return OperationStatus.RETRY_NOW;
}
else
return OperationStatus.SUCCESS;
#endregion
}

#endregion

#region Split Index
Expand Down

0 comments on commit 2a2bf2f

Please sign in to comment.