Skip to content

Commit

Permalink
[C#] Convert *UnsafeContext to structs and acquire on demand (#750)
Browse files Browse the repository at this point in the history
* Make *UnsafeContext a struct and acquire on demand (like LockableContext); change Resume/SuspendThread naming to Begin/EndUnsafe and add Begin/EndLockable

* Add AcquireLockable check for in-prepare checkpoint
  • Loading branch information
TedHartMS committed Sep 23, 2022
1 parent ea54b90 commit 2f55e90
Show file tree
Hide file tree
Showing 20 changed files with 584 additions and 613 deletions.
14 changes: 6 additions & 8 deletions cs/benchmark/FasterSpanByteYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ private void RunYcsbUnsafeContext(int thread_idx)
#endif

var session = store.For(functions).NewSession<FunctionsSB>();
var uContext = session.GetUnsafeContext();
uContext.ResumeThread();
var uContext = session.UnsafeContext;
uContext.BeginUnsafe();

try
{
Expand Down Expand Up @@ -203,10 +203,9 @@ private void RunYcsbUnsafeContext(int thread_idx)
}
finally
{
uContext.SuspendThread();
uContext.EndUnsafe();
}

uContext.Dispose();
session.Dispose();

sw.Stop();
Expand Down Expand Up @@ -479,8 +478,8 @@ private void SetupYcsbUnsafeContext(int thread_idx)
waiter.Wait();

var session = store.For(functions).NewSession<FunctionsSB>();
var uContext = session.GetUnsafeContext();
uContext.ResumeThread();
var uContext = session.UnsafeContext;
uContext.BeginUnsafe();

#if DASHBOARD
var tstart = Stopwatch.GetTimestamp();
Expand Down Expand Up @@ -531,9 +530,8 @@ private void SetupYcsbUnsafeContext(int thread_idx)
}
finally
{
uContext.SuspendThread();
uContext.EndUnsafe();
}
uContext.Dispose();
session.Dispose();
}

Expand Down
19 changes: 9 additions & 10 deletions cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ private void RunYcsbUnsafeContext(int thread_idx)
#endif

var session = store.For(functions).NewSession<Functions>();
var uContext = session.GetUnsafeContext();
uContext.ResumeThread();
var uContext = session.UnsafeContext;
uContext.BeginUnsafe();

try
{
Expand Down Expand Up @@ -199,10 +199,9 @@ private void RunYcsbUnsafeContext(int thread_idx)
}
finally
{
uContext.SuspendThread();
uContext.EndUnsafe();
}

uContext.Dispose();
session.Dispose();

sw.Stop();
Expand Down Expand Up @@ -319,7 +318,8 @@ internal unsafe (double, double) Run(TestLoader testLoader)
if (testLoader.Options.LockImpl == (int)LockImpl.Manual)
{
session = store.For(functions).NewSession<Functions>();
luContext = session.GetLockableUnsafeContext();
luContext = session.LockableUnsafeContext;
luContext.BeginLockable();

Console.WriteLine("Taking 2 manual locks");
luContext.Lock(xlock.key, xlock.kind);
Expand Down Expand Up @@ -436,7 +436,7 @@ internal unsafe (double, double) Run(TestLoader testLoader)
{
luContext.Unlock(xlock.key, xlock.kind);
luContext.Unlock(slock.key, slock.kind);
luContext.Dispose();
luContext.EndLockable();
session.Dispose();
}

Expand Down Expand Up @@ -467,8 +467,8 @@ private void SetupYcsbUnsafeContext(int thread_idx)
waiter.Wait();

var session = store.For(functions).NewSession<Functions>();
var uContext = session.GetUnsafeContext();
uContext.ResumeThread();
var uContext = session.UnsafeContext;
uContext.BeginUnsafe();

#if DASHBOARD
var tstart = Stopwatch.GetTimestamp();
Expand Down Expand Up @@ -518,9 +518,8 @@ private void SetupYcsbUnsafeContext(int thread_idx)
}
finally
{
uContext.SuspendThread();
uContext.EndUnsafe();
}
uContext.Dispose();
session.Dispose();
}

Expand Down
3 changes: 3 additions & 0 deletions cs/src/core/ClientSession/BasicContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ namespace FASTER.core
{
readonly ClientSession<Key, Value, Input, Output, Context, Functions> clientSession;

/// <summary>Indicates whether this struct has been initialized</summary>
public bool IsNull => this.clientSession is null;

internal BasicContext(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession)
{
this.clientSession = clientSession;
Expand Down
114 changes: 72 additions & 42 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -35,8 +36,8 @@ public sealed class ClientSession<Key, Value, Input, Output, Context, Functions>

internal readonly InternalFasterSession FasterSession;

UnsafeContext<Key, Value, Input, Output, Context, Functions> uContext;
LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> luContext;
readonly UnsafeContext<Key, Value, Input, Output, Context, Functions> uContext;
readonly LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> luContext;
readonly LockableContext<Key, Value, Input, Output, Context, Functions> lContext;
readonly BasicContext<Key, Value, Input, Output, Context, Functions> bContext;

Expand All @@ -49,32 +50,53 @@ public sealed class ClientSession<Key, Value, Input, Output, Context, Functions>
internal ulong sharedLockCount;
internal ulong exclusiveLockCount;

bool isAcquired;
bool isAcquiredLockable;

internal void Acquire()
internal void AcquireLockable()
{
CheckNotAcquired();
fht.IncrementNumLockingSessions();
isAcquired = true;
CheckIsNotAcquiredLockable();

while (true)
{
// Checkpoints cannot complete while we have active locking sessions.
while (IsInPreparePhase())
Thread.Yield();

fht.IncrementNumLockingSessions();
isAcquiredLockable = true;

if (!IsInPreparePhase())
break;
InternalReleaseLockable();
Thread.Yield();
}
}

internal void Release()
internal void ReleaseLockable()
{
CheckAcquired();
isAcquired = false;
CheckIsAcquiredLockable();
if (TotalLockCount > 0)
throw new FasterException($"EndLockable called with locks held: {sharedLockCount} shared locks, {exclusiveLockCount} exclusive locks");
InternalReleaseLockable();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void InternalReleaseLockable()
{
isAcquiredLockable = false;
fht.DecrementNumLockingSessions();
}

internal void CheckAcquired()
internal void CheckIsAcquiredLockable()
{
if (!isAcquired)
throw new FasterException("Method call on not-acquired Context");
if (!isAcquiredLockable)
throw new FasterException("Lockable method call when BeginLockable has not been called");
}

void CheckNotAcquired()
void CheckIsNotAcquiredLockable()
{
if (isAcquired)
throw new FasterException("Method call on acquired Context");
if (isAcquiredLockable)
throw new FasterException("BeginLockable cannot be called twice (call EndLockable first)");
}

internal ClientSession(
Expand All @@ -86,6 +108,8 @@ void CheckNotAcquired()
{
this.lContext = new(this);
this.bContext = new(this);
this.luContext = new(this);
this.uContext = new(this);

this.loggerFactory = loggerFactory;
this.logger = loggerFactory?.CreateLogger($"ClientSession-{GetHashCode():X8}");
Expand Down Expand Up @@ -199,22 +223,12 @@ public void Dispose()
/// <summary>
/// Return a new interface to Faster operations that supports manual epoch control.
/// </summary>
public UnsafeContext<Key, Value, Input, Output, Context, Functions> GetUnsafeContext()
{
this.uContext ??= new(this);
this.uContext.Acquire();
return this.uContext;
}
public UnsafeContext<Key, Value, Input, Output, Context, Functions> UnsafeContext => uContext;

/// <summary>
/// Return a new interface to Faster operations that supports manual locking and epoch control.
/// </summary>
public LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> GetLockableUnsafeContext()
{
this.luContext ??= new(this);
this.luContext.Acquire();
return this.luContext;
}
public LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> LockableUnsafeContext => luContext;

/// <summary>
/// Return a session wrapper that supports manual locking.
Expand Down Expand Up @@ -655,40 +669,51 @@ public async ValueTask ReadyToCompletePendingAsync(CancellationToken token = def
#region Other Operations

/// <inheritdoc/>
public unsafe void ResetModified(ref Key key)
public void ResetModified(ref Key key)
{
UnsafeResumeThread();
try
{
OperationStatus status;
do
status = fht.InternalModifiedBitOperation(ref key, out _);
while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession));
UnsafeResetModified(ref key);
}
finally
{
UnsafeSuspendThread();
}
}

internal void UnsafeResetModified(ref Key key)
{
OperationStatus status;
do
status = fht.InternalModifiedBitOperation(ref key, out _);
while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession));
}

/// <inheritdoc/>
public unsafe void ResetModified(Key key) => ResetModified(ref key);

/// <inheritdoc/>
internal unsafe bool IsModified(ref Key key)
internal bool IsModified(ref Key key)
{
RecordInfo modifiedInfo;
UnsafeResumeThread();
try
{
OperationStatus status;
do
status = fht.InternalModifiedBitOperation(ref key, out modifiedInfo, false);
while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession));
return UnsafeIsModified(ref key);
}
finally
{
UnsafeSuspendThread();
}
}

internal bool UnsafeIsModified(ref Key key)
{
RecordInfo modifiedInfo;
OperationStatus status;
do
status = fht.InternalModifiedBitOperation(ref key, out modifiedInfo, false);
while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession));
return modifiedInfo.Modified;
}

Expand Down Expand Up @@ -848,6 +873,8 @@ internal Status ContainsKeyInMemory(ref Key key, out long logicalAddress, long f
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void UnsafeResumeThread()
{
// We do not track any "acquired" state here; if someone mixes calls between safe and unsafe contexts, they will
// get the "trying to acquire already-acquired epoch" error.
fht.epoch.Resume();
fht.InternalRefresh(ctx, FasterSession);
}
Expand All @@ -856,8 +883,11 @@ internal void UnsafeResumeThread()
/// Suspend session on current thread
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void UnsafeSuspendThread()
=> fht.epoch.Suspend();
internal void UnsafeSuspendThread()
{
Debug.Assert(fht.epoch.ThisInstanceProtected());
fht.epoch.Suspend();
}

void IClientSession.AtomicSwitch(long version)
{
Expand All @@ -867,7 +897,7 @@ void IClientSession.AtomicSwitch(long version)
/// <summary>
/// Return true if Faster State Machine is in PREPARE sate
/// </summary>
public bool IsInPreparePhase()
internal bool IsInPreparePhase()
{
return this.fht.SystemState.Phase == Phase.PREPARE;
}
Expand Down
10 changes: 10 additions & 0 deletions cs/src/core/ClientSession/ILockableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ namespace FASTER.core
/// <typeparam name="TKey"></typeparam>
public interface ILockableContext<TKey>
{
/// <summary>
/// Begins a series of lock operations on possibly multiple keys; call before any locks are taken.
/// </summary>
void BeginLockable();

/// <summary>
/// Ends a series of lock operations on possibly multiple keys; call after all locks are released.
/// </summary>
void EndLockable();

/// <summary>
/// Lock the key with the specified <paramref name="lockType"/>, waiting until it is acquired
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions cs/src/core/ClientSession/IUnsafeContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ namespace FASTER.core
public interface IUnsafeContext
{
/// <summary>
/// Resume session on current thread. IMPORTANT: Call SuspendThread before any async op.
/// Resume session on current thread. IMPORTANT: Call <see cref="EndUnsafe"/> before any async op.
/// </summary>
void ResumeThread();
void BeginUnsafe();

/// <summary>
/// Suspend session on current thread
/// </summary>
void SuspendThread();
void EndUnsafe();
}
}

0 comments on commit 2f55e90

Please sign in to comment.