diff --git a/cs/src/core/ClientSession/AdvancedClientSession.cs b/cs/src/core/ClientSession/AdvancedClientSession.cs
index e4862605d..426439514 100644
--- a/cs/src/core/ClientSession/AdvancedClientSession.cs
+++ b/cs/src/core/ClientSession/AdvancedClientSession.cs
@@ -703,7 +703,7 @@ public Status Delete(Key key, Context userContext = default, long serialNo = 0)
/// Experimental feature
/// Checks whether specified record is present in memory
/// (between HeadAddress and tail, or between fromAddress
- /// and tail)
+ /// and tail), including tombstones.
///
/// Key of the record.
/// Logical address of record, if found
diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs
index aaacabe08..84deed359 100644
--- a/cs/src/core/ClientSession/ClientSession.cs
+++ b/cs/src/core/ClientSession/ClientSession.cs
@@ -663,7 +663,7 @@ public Status Delete(Key key, Context userContext = default, long serialNo = 0)
/// Experimental feature
/// Checks whether specified record is present in memory
/// (between HeadAddress and tail, or between fromAddress
- /// and tail)
+ /// and tail), including tombstones.
///
/// Key of the record.
/// Logical address of record, if found
@@ -890,6 +890,29 @@ public long Compact(long untilAddress, bool shiftBeginAddre
return fht.Log.Compact(functions, compactionFunctions, untilAddress, shiftBeginAddress);
}
+ ///
+ /// Insert key and value with the record info preserved.
+ /// Succeed only if logical address of the key isn't greater than foundLogicalAddress; otherwise give up and return.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal void CopyToTail(ref Key key, ref Value desiredValue, ref RecordInfo recordInfo, long foundLogicalAddress)
+ {
+ if (SupportAsync) UnsafeResumeThread();
+ try
+ {
+ fht.InternalCopyToTail(ref key, ref desiredValue, ref recordInfo, foundLogicalAddress, FasterSession, ctx, noReadCache: true);
+ }
+ finally
+ {
+ if (SupportAsync) UnsafeSuspendThread();
+ }
+ }
+
///
/// Iterator for all (distinct) live key-values stored in FASTER
///
diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs
index a0d370cdb..e542a4956 100644
--- a/cs/src/core/Index/FASTER/FASTERImpl.cs
+++ b/cs/src/core/Index/FASTER/FASTERImpl.cs
@@ -191,7 +191,7 @@ internal enum LatchOperation : byte
if (CopyReadsToTail == CopyReadsToTail.FromReadOnly)
{
var container = hlog.GetValueContainer(ref hlog.GetValue(physicalAddress));
- InternalUpsert(ref key, ref container.Get(), ref userContext, ref pendingContext, fasterSession, sessionCtx, lsn);
+ InternalTryCopyToTail(ref key, ref container.Get(), ref pendingContext.recordInfo, logicalAddress, fasterSession, sessionCtx);
container.Dispose();
}
return OperationStatus.SUCCESS;
@@ -1369,97 +1369,12 @@ private enum LatchDestination
{
Debug.Assert(RelaxedCPR || pendingContext.version == opCtx.version);
- var bucket = default(HashBucket*);
- var slot = default(int);
- var logicalAddress = Constants.kInvalidAddress;
- var physicalAddress = default(long);
-
- // If NoKey, we do not have the key in the initial call and must use the key from the satisfied request.
ref Key key = ref pendingContext.NoKey ? ref hlog.GetContextRecordKey(ref request) : ref pendingContext.key.Get();
-
- var hash = comparer.GetHashCode64(ref key);
-
- var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);
-
-#region Trace back record in in-memory HybridLog
- var entry = default(HashBucketEntry);
- FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
- logicalAddress = entry.word & Constants.kAddressMask;
-
- if (UseReadCache)
- SkipReadCache(ref logicalAddress);
- var latestLogicalAddress = logicalAddress;
-
- if (logicalAddress >= hlog.HeadAddress)
- {
- physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
- if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
- {
- logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
- TraceBackForKeyMatch(ref key,
- logicalAddress,
- hlog.HeadAddress,
- out logicalAddress,
- out physicalAddress);
- }
- }
-#endregion
-
- if (logicalAddress > pendingContext.entry.Address)
- {
- // Give up early
- return;
- }
-
-#region Create new copy in mutable region
- physicalAddress = (long)request.record.GetValidPointer();
- var (actualSize, allocatedSize) = hlog.GetRecordSize(physicalAddress);
-
- long newLogicalAddress, newPhysicalAddress;
- if (UseReadCache)
- {
- BlockAllocateReadCache(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
- newPhysicalAddress = readcache.GetPhysicalAddress(newLogicalAddress);
- RecordInfo.WriteInfo(ref readcache.GetInfo(newPhysicalAddress), opCtx.version,
- tombstone:false, invalidBit:false,
- entry.Address);
- readcache.Serialize(ref key, newPhysicalAddress);
- fasterSession.SingleWriter(ref key,
- ref hlog.GetContextRecordValue(ref request),
- ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize));
- }
- else
- {
- BlockAllocate(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
- newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
- RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version,
- tombstone:false, invalidBit:false,
- latestLogicalAddress);
- hlog.Serialize(ref key, newPhysicalAddress);
- fasterSession.SingleWriter(ref key,
- ref hlog.GetContextRecordValue(ref request),
- ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize));
- }
-
-
- var updatedEntry = default(HashBucketEntry);
- updatedEntry.Tag = tag;
- updatedEntry.Address = newLogicalAddress & Constants.kAddressMask;
- updatedEntry.Pending = entry.Pending;
- updatedEntry.Tentative = false;
- updatedEntry.ReadCache = UseReadCache;
-
- var foundEntry = default(HashBucketEntry);
- foundEntry.word = Interlocked.CompareExchange(
- ref bucket->bucket_entries[slot],
- updatedEntry.word,
- entry.word);
- if (foundEntry.word != entry.word)
- {
- if (!UseReadCache) hlog.GetInfo(newPhysicalAddress).Invalid = true;
- // We don't retry, just give up
- }
-#endregion
+ byte* physicalAddress = request.record.GetValidPointer();
+ long logicalAddress = pendingContext.entry.Address;
+ ref RecordInfo oldRecordInfo = ref hlog.GetInfoFromBytePointer(physicalAddress);
+
+ InternalTryCopyToTail(opCtx, ref key, ref hlog.GetContextRecordValue(ref request), ref oldRecordInfo, logicalAddress, fasterSession, currentCtx);
}
///
@@ -1890,6 +1805,172 @@ private void ReleaseSharedLatch(Key key)
foundPhysicalAddress = Constants.kInvalidAddress;
return false;
}
+
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal OperationStatus InternalCopyToTail(
+ ref Key key, ref Value value,
+ ref RecordInfo recordInfo,
+ long expectedLogicalAddress,
+ FasterSession fasterSession,
+ FasterExecutionContext currentCtx,
+ bool noReadCache = false)
+ where FasterSession : IFasterSession
+ {
+ OperationStatus internalStatus;
+ do
+ internalStatus = InternalTryCopyToTail(currentCtx, ref key, ref value, ref recordInfo, expectedLogicalAddress, fasterSession, currentCtx, noReadCache);
+ while (internalStatus == OperationStatus.RETRY_NOW);
+ return internalStatus;
+ }
+
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal OperationStatus InternalTryCopyToTail(
+ ref Key key, ref Value value,
+ ref RecordInfo recordInfo,
+ long foundLogicalAddress,
+ FasterSession fasterSession,
+ FasterExecutionContext currentCtx,
+ bool noReadCache = false)
+ where FasterSession : IFasterSession
+ => InternalTryCopyToTail(currentCtx, ref key, ref value, ref recordInfo, foundLogicalAddress, fasterSession, currentCtx, noReadCache);
+
+ ///
+ /// Helper function for trying to copy existing immutable records (at foundLogicalAddress) to the tail,
+ /// used in
+ /// ,
+ /// and
+ ///
+ /// Succeed only if the record for the same key hasn't changed.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// The thread(or session) context to execute operation in.
+ /// It's different from currentCtx only when the function is used in InternalContinuePendingReadCopyToTail
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// The expected address of the record being copied.
+ ///
+ ///
+ ///
+ ///
+ /// If true, it won't clutter read cache.
+ /// Otherwise, it still checks UseReadCache to determine whether to buffer in read cache.
+ /// It is useful in Compact.
+ ///
+ ///
+ /// NOTFOUND: didn't find the expected record of the same key that isn't greater than expectedLogicalAddress
+ /// RETRY_NOW: failed.
+ /// SUCCESS:
+ ///
+ internal OperationStatus InternalTryCopyToTail(
+ FasterExecutionContext opCtx,
+ ref Key key, ref Value value,
+ ref RecordInfo recordInfo,
+ long expectedLogicalAddress,
+ FasterSession fasterSession,
+ FasterExecutionContext currentCtx,
+ bool noReadCache = false)
+ where FasterSession : IFasterSession
+ {
+ Debug.Assert(expectedLogicalAddress >= hlog.BeginAddress);
+ var bucket = default(HashBucket*);
+ var slot = default(int);
+
+ var hash = comparer.GetHashCode64(ref key);
+ var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);
+
+ var entry = default(HashBucketEntry);
+ FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
+ var logicalAddress = entry.Address;
+ var physicalAddress = default(long);
+ if (UseReadCache)
+ SkipReadCache(ref logicalAddress);
+ var latestLogicalAddress = logicalAddress;
+
+ if (logicalAddress >= hlog.HeadAddress)
+ {
+ physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
+ if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
+ {
+ logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
+ TraceBackForKeyMatch(ref key,
+ logicalAddress,
+ hlog.HeadAddress,
+ out logicalAddress,
+ out physicalAddress);
+ }
+ }
+
+ if (logicalAddress > expectedLogicalAddress || logicalAddress < hlog.BeginAddress)
+ {
+ // We give up early.
+ // Note: In Compact, expectedLogicalAddress may not exactly match the source of this copy operation,
+ // but instead only an upper bound.
+ return OperationStatus.NOTFOUND;
+ }
+ #region Create new copy in mutable region
+ var (actualSize, allocatedSize) = hlog.GetRecordSize(ref key, ref value);
+
+ long newLogicalAddress, newPhysicalAddress;
+ bool copyToReadCache = noReadCache ? false : UseReadCache;
+ if (copyToReadCache)
+ {
+ BlockAllocateReadCache(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
+ newPhysicalAddress = readcache.GetPhysicalAddress(newLogicalAddress);
+ RecordInfo.WriteInfo(ref readcache.GetInfo(newPhysicalAddress), opCtx.version,
+ tombstone: false, invalidBit: false,
+ entry.Address);
+ readcache.Serialize(ref key, newPhysicalAddress);
+ fasterSession.SingleWriter(ref key,
+ ref value,
+ ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize));
+ }
+ else
+ {
+ BlockAllocate(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
+ newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
+ RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version,
+ tombstone: false, invalidBit: false,
+ latestLogicalAddress);
+ hlog.Serialize(ref key, newPhysicalAddress);
+ fasterSession.SingleWriter(ref key,
+ ref value,
+ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize));
+ }
+
+
+ var updatedEntry = default(HashBucketEntry);
+ updatedEntry.Tag = tag;
+ updatedEntry.Address = newLogicalAddress & Constants.kAddressMask;
+ updatedEntry.Pending = entry.Pending;
+ updatedEntry.Tentative = false;
+ updatedEntry.ReadCache = copyToReadCache;
+
+ var foundEntry = default(HashBucketEntry);
+ foundEntry.word = Interlocked.CompareExchange(
+ ref bucket->bucket_entries[slot],
+ updatedEntry.word,
+ entry.word);
+ if (foundEntry.word != entry.word)
+ {
+ if (!copyToReadCache) hlog.GetInfo(newPhysicalAddress).Invalid = true;
+ // Note: only Compact actually retries;
+ // other operations, i.e., copy to tail during reads, just give up if the first try fails
+ return OperationStatus.RETRY_NOW;
+ }
+ else
+ return OperationStatus.SUCCESS;
+ #endregion
+ }
+
#endregion
#region Split Index
diff --git a/cs/src/core/Index/FASTER/LogAccessor.cs b/cs/src/core/Index/FASTER/LogAccessor.cs
index 6cf55e0ac..e93275d33 100644
--- a/cs/src/core/Index/FASTER/LogAccessor.cs
+++ b/cs/src/core/Index/FASTER/LogAccessor.cs
@@ -302,11 +302,13 @@ public void DisposeFromMemory()
if (untilAddress > fht.Log.SafeReadOnlyAddress)
throw new FasterException("Can compact only until Log.SafeReadOnlyAddress");
var originalUntilAddress = untilAddress;
+ var expectedAddress = untilAddress;
var lf = new LogCompactionFunctions(functions);
using var fhtSession = fht.For(lf).NewSession>();
VariableLengthStructSettings variableLengthStructSettings = null;
+ VariableLengthStructSettings variableLengthStructSettingsKaddr = null;
if (allocator is VariableLengthBlittableAllocator varLen)
{
variableLengthStructSettings = new VariableLengthStructSettings
@@ -314,6 +316,11 @@ public void DisposeFromMemory()
keyLength = varLen.KeyLength,
valueLength = varLen.ValueLength,
};
+ variableLengthStructSettingsKaddr = new VariableLengthStructSettings
+ {
+ keyLength = varLen.KeyLength,
+ valueLength = null,
+ };
}
using (var tempKv = new FasterKV(fht.IndexSize, new LogSettings { LogDevice = new NullDevice(), ObjectLogDevice = new NullDevice() }, comparer: fht.Comparer, variableLengthStructSettings: variableLengthStructSettings))
@@ -327,12 +334,23 @@ public void DisposeFromMemory()
ref var value = ref iter1.GetValue();
if (recordInfo.Tombstone || cf.IsDeleted(key, value))
+ {
tempKvSession.Delete(ref key, default, 0);
+ }
else
+ {
tempKvSession.Upsert(ref key, ref value, default, 0);
+ // below is to get and preserve information in RecordInfo, if we need.
+ /*tempKvSession.ContainsKeyInMemory(ref key, out long logicalAddress);
+ long physicalAddress = tempKv.hlog.GetPhysicalAddress(logicalAddress);
+ ref var tempRecordInfo = ref tempKv.hlog.GetInfo(physicalAddress);
+ RecordInfo.WriteInfo(ref tempRecordInfo,
+ tempRecordInfo.Version, false, false, tempRecordInfo.PreviousAddress);*/
+ }
}
// Ensure address is at record boundary
untilAddress = originalUntilAddress = iter1.NextAddress;
+ expectedAddress = untilAddress;
}
// Scan until SafeReadOnlyAddress
@@ -374,8 +392,10 @@ public void DisposeFromMemory()
// Possibly deleted key (once ContainsKeyInMemory is updated to check Tombstones)
continue;
}
-
- fhtSession.Upsert(ref iter3.GetKey(), ref iter3.GetValue(), default, 0);
+ // Note: we use untilAddress as expectedAddress here.
+ // As long as there's no record of the same key whose address is greater than untilAddress,
+ // i.e., the last address that this compact covers, we are safe to copy the old record to the tail.
+ fhtSession.CopyToTail(ref iter3.GetKey(), ref iter3.GetValue(), ref recordInfo, expectedAddress);
}
}
}