Skip to content

Commit

Permalink
Add TryLock() to lockable contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
TedHartMS committed Jun 3, 2023
1 parent 77296d9 commit 6e24390
Show file tree
Hide file tree
Showing 4 changed files with 418 additions and 2 deletions.
87 changes: 85 additions & 2 deletions cs/src/core/ClientSession/ILockableContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Threading;
using System;

namespace FASTER.core
{
/// <summary>
Expand Down Expand Up @@ -112,18 +115,98 @@ void Lock<TLockableKey>(TLockableKey[] keys)
/// </summary>
/// <typeparam name="TLockableKey"></typeparam>
/// <param name="keys">keyCodes to be locked, and whether that locking is shared or exclusive; must be sorted by <see cref="SortLockCodes{TLockableKey}(TLockableKey[])"/>.</param>
/// <param name="start">The starting index to Lock</param>
/// <param name="start">The starting key index to Lock</param>
/// <param name="count">The number of keys to Lock</param>
void Lock<TLockableKey>(TLockableKey[] keys, int start, int count)
where TLockableKey : ILockableKey;

/// <summary>
/// Locks the keys identified in the passed array, with retry limits or cancellation.
/// </summary>
/// <typeparam name="TLockableKey"></typeparam>
/// <param name="keys">keyCodes to be locked, and whether that locking is shared or exclusive; must be sorted by <see cref="SortLockCodes{TLockableKey}(TLockableKey[])"/>.</param>
/// <param name="numRetriesPerKey">Maximum number of retries per key:
/// <list>
/// <li>less than 0: ignored; may be cancelled by <paramref name="cancellationToken"/></li>
/// <li>0: fail if first locking try does not succeed</li>
/// <li>greater than 0: retry up to this many times for each key; may be cancelled early by <paramref name="cancellationToken"/></li>
/// </list>
/// </param>
/// <param name="cancellationToken">The cancellation token, if any</param>
bool TryLock<TLockableKey>(TLockableKey[] keys, int numRetriesPerKey, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey;

/// <summary>
/// Locks the keys identified in the passed array, with retry limits or cancellation.
/// </summary>
/// <typeparam name="TLockableKey"></typeparam>
/// <param name="keys">keyCodes to be locked, and whether that locking is shared or exclusive; must be sorted by <see cref="SortLockCodes{TLockableKey}(TLockableKey[])"/>.</param>
/// <param name="start">The starting key index to Lock</param>
/// <param name="count">The number of keys to Lock</param>
/// <param name="numRetriesPerKey">Maximum number of retries per key:
/// <list>
/// <li>less than 0: ignored; may be cancelled by <paramref name="cancellationToken"/></li>
/// <li>less than or equal to 0: fail if first locking try does not succeed</li>
/// <li>greater than 0: retry up to this many times for each key; may be cancelled early by <paramref name="cancellationToken"/></li>
/// </list>
/// </param>
/// <param name="cancellationToken">The cancellation token, if any</param>
bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, int numRetriesPerKey, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey;

/// <summary>
/// Locks the keys identified in the passed array, with retry limits or cancellation.
/// </summary>
/// <typeparam name="TLockableKey"></typeparam>
/// <param name="keys">keyCodes to be locked, and whether that locking is shared or exclusive; must be sorted by <see cref="SortLockCodes{TLockableKey}(TLockableKey[])"/>.</param>
/// <param name="timeout">TimeSpan limiting the duration of the TryLock() call over all keys.</param>
/// <param name="cancellationToken">The cancellation token, if any</param>
bool TryLock<TLockableKey>(TLockableKey[] keys, TimeSpan timeout, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey;

/// <summary>
/// Locks the keys identified in the passed array, with retry limits or cancellation.
/// </summary>
/// <typeparam name="TLockableKey"></typeparam>
/// <param name="keys">keyCodes to be locked, and whether that locking is shared or exclusive; must be sorted by <see cref="SortLockCodes{TLockableKey}(TLockableKey[])"/>.</param>
/// <param name="start">The starting key index to Lock</param>
/// <param name="count">The number of keys to Lock</param>
/// <param name="timeout">TimeSpan limiting the duration of the TryLock() call over all keys.</param>
/// <param name="cancellationToken">The cancellation token, if any</param>
bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, TimeSpan timeout, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey;

/// <summary>
/// Locks the keys identified in the passed array, with retry limits or cancellation.
/// </summary>
/// <typeparam name="TLockableKey"></typeparam>
/// <param name="keys">keyCodes to be locked, and whether that locking is shared or exclusive; must be sorted by <see cref="SortLockCodes{TLockableKey}(TLockableKey[])"/>.</param>
/// <param name="cancellationToken">The cancellation token</param>
bool TryLock<TLockableKey>(TLockableKey[] keys, CancellationToken cancellationToken)
where TLockableKey : ILockableKey;

/// <summary>
/// Locks the keys identified in the passed array, with retry limits or cancellation.
/// </summary>
/// <typeparam name="TLockableKey"></typeparam>
/// <param name="keys">keyCodes to be locked, and whether that locking is shared or exclusive; must be sorted by <see cref="SortLockCodes{TLockableKey}(TLockableKey[])"/>.</param>
/// <param name="start">The starting key index to Lock</param>
/// <param name="count">The number of keys to Lock</param>
/// <param name="cancellationToken">The cancellation token, if any</param>
bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, CancellationToken cancellationToken)
where TLockableKey : ILockableKey;

/// <inheritdoc/>
bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, int numRetries, TimeSpan timeout, CancellationToken cancellationToken)
where TLockableKey : ILockableKey;

/// <summary>
/// Unlocks the keys identified in the passed array.
/// </summary>
/// <typeparam name="TLockableKey"></typeparam>
/// <param name="keys">keyCodes to be unlocked, and whether that unlocking is shared or exclusive; must be sorted by <see cref="SortLockCodes{TLockableKey}(TLockableKey[])"/>.</param>
void Unlock<TLockableKey>(TLockableKey[] keys)
where TLockableKey : ILockableKey;
where TLockableKey : ILockableKey;

/// <summary>
/// Unlocks the keys identified in the passed array.
Expand Down
108 changes: 108 additions & 0 deletions cs/src/core/ClientSession/LockableContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
Expand Down Expand Up @@ -99,6 +100,65 @@ internal LockableContext(ClientSession<Key, Value, Input, Output, Context, Funct
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static unsafe bool DoInternalTryLock<FasterSession, TLockableKey>(FasterSession fasterSession, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession,
TLockableKey[] keys, int start, int count, int maxRetriesPerKey, TimeSpan timeout, CancellationToken cancellationToken)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
where TLockableKey : ILockableKey
{
// The key codes are sorted, but there may be duplicates; the sorting is such that exclusive locks come first for each key code,
// which of course allows the session to do shared operations as well, so we take the first occurrence of each key code.
// Unlock has to be done in the reverse order of locking, so we take the *last* occurrence of each key there.
var end = start + count - 1;
var startTime = timeout.Ticks > 0 ? DateTime.UtcNow : default;

for (int keyIdx = start; keyIdx <= end; ++keyIdx)
{
ref var key = ref keys[keyIdx];
if (keyIdx == start || clientSession.fht.LockTable.GetBucketIndex(key.LockCode) != clientSession.fht.LockTable.GetBucketIndex(keys[keyIdx - 1].LockCode))
{
for (int numRetriesForKey = 0 ; ; )
{
OperationStatus status = clientSession.fht.InternalLock(key.LockCode, new(LockOperationType.Lock, key.LockType));
bool fail = false;
if (status == OperationStatus.SUCCESS)
{
if (key.LockType == LockType.Exclusive)
++clientSession.exclusiveLockCount;
else if (key.LockType == LockType.Shared)
++clientSession.sharedLockCount;

if (keyIdx == end)
goto Success;
}
else
fail = (maxRetriesPerKey >= 0 && ++numRetriesForKey > maxRetriesPerKey);

// CancellationToken can accompany either of the other two mechanisms
fail |= (timeout.Ticks > 0 && DateTime.UtcNow.Ticks - startTime.Ticks > timeout.Ticks) || cancellationToken.IsCancellationRequested;
if (!fail && status == OperationStatus.SUCCESS)
break; // out of the retry loop

clientSession.fht.HandleImmediateNonPendingRetryStatus<Input, Output, Context, FasterSession>(status, fasterSession);

// Failure (including timeout/cancellation after a successful Lock before we've completed all keys). Unlock anything we already locked.
for (int unlockIdx = keyIdx - (status == OperationStatus.SUCCESS ? 0 : 1); unlockIdx >= start; --unlockIdx)
{
var lockType = DoLockOp(fasterSession, clientSession, keys, start, LockOperationType.Unlock, unlockIdx);
if (lockType == LockType.Exclusive)
--clientSession.exclusiveLockCount;
else if (lockType == LockType.Shared)
--clientSession.sharedLockCount;
}
return false;
}
}
}

Success:
return true;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static unsafe LockType DoLockOp<FasterSession, TLockableKey>(FasterSession fasterSession, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession,
TLockableKey[] keys, int start, LockOperationType lockOpType, int idx)
Expand Down Expand Up @@ -139,6 +199,54 @@ public void Lock<TLockableKey>(TLockableKey[] keys, int start, int count)
}
}

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int numRetriesPerKey, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
=> TryLock(keys, 0, keys.Length, numRetriesPerKey, default, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, int numRetriesPerKey, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
=> TryLock(keys, start, count, numRetriesPerKey, default, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, TimeSpan timeout, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
=> TryLock(keys, 0, keys.Length, -1, timeout, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, TimeSpan timeout, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
=> TryLock(keys, start, count, -1, timeout, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, CancellationToken cancellationToken)
where TLockableKey : ILockableKey
=> TryLock(keys, 0, keys.Length, -1, default, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, CancellationToken cancellationToken)
where TLockableKey : ILockableKey
=> TryLock(keys, start, count, -1, default, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, int numRetries, TimeSpan timeout, CancellationToken cancellationToken)
where TLockableKey : ILockableKey
{
clientSession.CheckIsAcquiredLockable();
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected(), "Trying to protect an already-protected epoch for LockableUnsafeContext.Lock()");

clientSession.UnsafeResumeThread();
try
{
return DoInternalTryLock(FasterSession, clientSession, keys, start, count, numRetries, timeout, cancellationToken);
}
finally
{
clientSession.UnsafeSuspendThread();
}
}

/// <inheritdoc/>
public void Unlock<TLockableKey>(TLockableKey[] keys) where TLockableKey : ILockableKey => Unlock(keys, 0, keys.Length);

Expand Down
41 changes: 41 additions & 0 deletions cs/src/core/ClientSession/LockableUnsafeContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
Expand Down Expand Up @@ -89,6 +90,46 @@ public void Lock<TLockableKey>(TLockableKey[] keys, int start, int count)
LockableContext<Key, Value, Input, Output, Context, Functions>.DoInternalLockOp(FasterSession, clientSession, keys, start, count, LockOperationType.Lock);
}

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int numRetriesPerKey, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
=> TryLock(keys, 0, keys.Length, numRetriesPerKey, default, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, int numRetriesPerKey, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
=> TryLock(keys, start, count, numRetriesPerKey, default, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, TimeSpan timeout, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
=> TryLock(keys, 0, keys.Length, -1, timeout, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, TimeSpan timeout, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
=> TryLock(keys, start, count, -1, timeout, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, CancellationToken cancellationToken)
where TLockableKey : ILockableKey
=> TryLock(keys, 0, keys.Length, -1, default, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, CancellationToken cancellationToken)
where TLockableKey : ILockableKey
=> TryLock(keys, start, count, -1, default, cancellationToken);

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, int numRetries, TimeSpan timeout, CancellationToken cancellationToken)
where TLockableKey : ILockableKey
{
clientSession.CheckIsAcquiredLockable();
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected(), "Epoch protection required for LockableUnsafeContext.Lock()");

return LockableContext<Key, Value, Input, Output, Context, Functions>.DoInternalTryLock(FasterSession, clientSession, keys, start, count, numRetries, timeout, cancellationToken);
}

/// <inheritdoc/>
public void Unlock<TLockableKey>(TLockableKey[] keys) where TLockableKey : ILockableKey => Unlock(keys, 0, keys.Length);

Expand Down
Loading

0 comments on commit 6e24390

Please sign in to comment.