Skip to content

Commit

Permalink
Record locking (#394)
Browse files Browse the repository at this point in the history
* Add RecordInfo.SpinLock

* Pass kInvalidAddress to SingleReader for readcache

* IntExclusiveLocker does not require unsafe

* Assert logicalAddress >= ReadOnlyAddress in RecordAccessor.SpinLock

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
TedHartMS and badrishc committed Feb 1, 2021
1 parent aef2bbe commit 3908c99
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 32 deletions.
84 changes: 62 additions & 22 deletions cs/src/core/Index/Common/RecordInfo.cs
Expand Up @@ -6,8 +6,10 @@
//#define RECORD_INFO_WITH_PIN_COUNT

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;

namespace FASTER.core
{
Expand All @@ -18,7 +20,7 @@ namespace FASTER.core
#endif
public unsafe struct RecordInfo
{
public const int kFinalBitOffset = 48;
public const int kLatchBitOffset = 48;

public const int kTombstoneBitOffset = 49;

Expand All @@ -34,7 +36,7 @@ public unsafe struct RecordInfo

public const long kPreviousAddressMask = (1L << 48) - 1;

public const long kFinalBitMask = (1L << kFinalBitOffset);
public const long kLatchBitMask = (1L << kLatchBitOffset);

public const long kTombstoneMask = (1L << kTombstoneBitOffset);

Expand All @@ -51,10 +53,9 @@ public unsafe struct RecordInfo
[FieldOffset(sizeof(long))]
private int access_data;

public static void WriteInfo(RecordInfo* info, int checkpointVersion, bool final, bool tombstone, bool invalidBit, long previousAddress)
public static void WriteInfo(RecordInfo* info, int checkpointVersion, bool tombstone, bool invalidBit, long previousAddress)
{
info->word = default(long);
info->Final = final;
info->Tombstone = tombstone;
info->Invalid = invalidBit;
info->PreviousAddress = previousAddress;
Expand Down Expand Up @@ -104,10 +105,9 @@ public void Unpin()
[FieldOffset(0)]
private long word;

public static void WriteInfo(ref RecordInfo info, int checkpointVersion, bool final, bool tombstone, bool invalidBit, long previousAddress)
public static void WriteInfo(ref RecordInfo info, int checkpointVersion, bool tombstone, bool invalidBit, long previousAddress)
{
info.word = default;
info.Final = final;
info.Tombstone = tombstone;
info.Invalid = invalidBit;
info.PreviousAddress = previousAddress;
Expand Down Expand Up @@ -144,46 +144,86 @@ public void Unpin()
throw new InvalidOperationException();
}
#endif
public bool IsNull()
{
return word == 0;
}
/// <summary>
/// The RecordInfo locked by this thread, if any.
/// </summary>
[ThreadStatic]
internal static RecordInfo* threadLockedRecord;

/// <summary>
/// The number of times the current thread has (re-)entered the lock.
/// </summary>
[ThreadStatic]
internal static int threadLockedRecordEntryCount;

public bool Tombstone
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SpinLock()
{
get
// Check for a re-entrant lock.
if (threadLockedRecord == (RecordInfo*)Unsafe.AsPointer(ref this))
{
return (word & kTombstoneMask) > 0;
Debug.Assert(threadLockedRecordEntryCount > 0);
++threadLockedRecordEntryCount;
return;
}

set
// RecordInfo locking is intended for use in concurrent callbacks only (ConcurrentReader, ConcurrentWriter, InPlaceUpdater),
// so only the RecordInfo for that callback should be locked. A different RecordInfo being locked implies a missing Unlock.
Debug.Assert(threadLockedRecord == null);
Debug.Assert(threadLockedRecordEntryCount == 0);
while (true)
{
if (value)
long expected_word = word;
if ((expected_word & kLatchBitMask) == 0)
{
word |= kTombstoneMask;
var found_word = Interlocked.CompareExchange(ref word, expected_word | kLatchBitMask, expected_word);
if (found_word == expected_word)
{
threadLockedRecord = (RecordInfo*)Unsafe.AsPointer(ref this);
threadLockedRecordEntryCount = 1;
return;
}
}
else
Thread.Yield();
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Unlock()
{
Debug.Assert(threadLockedRecord == (RecordInfo*)Unsafe.AsPointer(ref this));
if (threadLockedRecord == (RecordInfo*)Unsafe.AsPointer(ref this))
{
Debug.Assert(threadLockedRecordEntryCount > 0);
if (--threadLockedRecordEntryCount == 0)
{
word &= ~kTombstoneMask;
word &= ~kLatchBitMask;
threadLockedRecord = null;
}
}
}

public bool Final
public bool IsNull()
{
return word == 0;
}

public bool Tombstone
{
get
{
return (word & kFinalBitMask) > 0;
return (word & kTombstoneMask) > 0;
}

set
{
if (value)
{
word |= kFinalBitMask;
word |= kTombstoneMask;
}
else
{
word &= ~kFinalBitMask;
word &= ~kTombstoneMask;
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Expand Up @@ -116,7 +116,8 @@ internal enum LatchOperation : byte
}

// This is not called when looking up by address, so we do not set pendingContext.recordInfo.
fasterSession.SingleReader(ref key, ref input, ref readcache.GetValue(physicalAddress), ref output, logicalAddress);
// ReadCache addresses are not valid for indexing etc. so pass kInvalidAddress.
fasterSession.SingleReader(ref key, ref input, ref readcache.GetValue(physicalAddress), ref output, Constants.kInvalidAddress);
return OperationStatus.SUCCESS;
}
}
Expand Down Expand Up @@ -433,7 +434,7 @@ internal enum LatchOperation : byte
var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress),
sessionCtx.version,
true, false, false,
tombstone:false, invalidBit:false,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key, ref value,
Expand Down Expand Up @@ -759,7 +760,7 @@ internal enum LatchOperation : byte
BlockAllocate(allocatedSize, out long newLogicalAddress, sessionCtx, fasterSession);
var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), sessionCtx.version,
true, false, false,
tombstone:false, invalidBit:false,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);

Expand Down Expand Up @@ -1057,8 +1058,7 @@ internal enum LatchOperation : byte
BlockAllocate(allocateSize, out long newLogicalAddress, sessionCtx, fasterSession);
var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress),
sessionCtx.version,
true, true, false,
sessionCtx.version, tombstone:true, invalidBit:false,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);

Expand Down Expand Up @@ -1313,7 +1313,7 @@ internal enum LatchOperation : byte
BlockAllocateReadCache(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
newPhysicalAddress = readcache.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref readcache.GetInfo(newPhysicalAddress), opCtx.version,
true, false, false,
tombstone:false, invalidBit:false,
entry.Address);
readcache.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key,
Expand All @@ -1326,7 +1326,7 @@ internal enum LatchOperation : byte
BlockAllocate(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version,
true, false, false,
tombstone:false, invalidBit:false,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key,
Expand Down Expand Up @@ -1458,7 +1458,7 @@ internal enum LatchOperation : byte
BlockAllocate(allocatedSize, out long newLogicalAddress, sessionCtx, fasterSession);
var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version,
true, false, false,
tombstone:false, invalidBit:false,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);
if ((request.logicalAddress < hlog.BeginAddress) || (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone))
Expand Down
19 changes: 19 additions & 0 deletions cs/src/core/Index/FASTER/RecordAccessor.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Diagnostics;
using System.Runtime.CompilerServices;

namespace FASTER.core
Expand Down Expand Up @@ -83,6 +84,24 @@ public bool IsReadCacheAddress(long logicalAddress)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int Version(long logicalAddress) => GetRecordInfo(logicalAddress).Version;

/// <summary>
/// Locks the RecordInfo at address
/// </summary>
/// <param name="logicalAddress">The address to examine</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SpinLock(long logicalAddress)
{
Debug.Assert(logicalAddress >= this.fkv.Log.ReadOnlyAddress);
GetRecordInfo(logicalAddress).SpinLock();
}

/// <summary>
/// Unlocks the RecordInfo at address
/// </summary>
/// <param name="logicalAddress">The address to examine</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Unlock(long logicalAddress) => GetRecordInfo(logicalAddress).Unlock();

#endregion public interface
}
}
4 changes: 2 additions & 2 deletions cs/src/core/Utilities/IntExclusiveLocker.cs
Expand Up @@ -6,7 +6,7 @@ namespace FASTER.core
/// <summary>
/// Exclusive lock + marking using 2 MSB bits of int
/// </summary>
internal unsafe struct IntExclusiveLocker
internal struct IntExclusiveLocker
{
const int kLatchBitMask = 1 << 31;
const int kMarkBitMask = 1 << 30;
Expand All @@ -30,7 +30,7 @@ public static void SpinLock(ref int value)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Unlock(ref int value)
{
value = value & ~kLatchBitMask;
value &= ~kLatchBitMask;
}

public static void Mark(ref int value)
Expand Down

0 comments on commit 3908c99

Please sign in to comment.