Skip to content

Commit

Permalink
Cleanup and updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Oct 7, 2019
1 parent 915f01e commit 089d545
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 49 deletions.
11 changes: 8 additions & 3 deletions cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ public enum Op : ulong
ReadModifyWrite = 2
}

const bool kUseSyntheticData = false;
#if DEBUG
const bool kUseSmallData = true;
const bool kUseSyntheticData = true;
#else
const bool kUseSmallData = false;
const bool kUseSyntheticData = false;
#endif
const long kInitCount = kUseSmallData ? 2500480 : 250000000;
const long kTxnCount = kUseSmallData ? 10000000 : 1000000000;
const int kMaxKey = kUseSmallData ? 1 << 22 : 1 << 28;
Expand Down Expand Up @@ -432,7 +437,7 @@ void DoContinuousMeasurements()
}
#endif

#region Load Data
#region Load Data

private unsafe void LoadDataFromFile(string filePath)
{
Expand Down Expand Up @@ -574,7 +579,7 @@ private void LoadSyntheticData()
Console.WriteLine("loaded " + kTxnCount + " txns.");

}
#endregion
#endregion


}
Expand Down
58 changes: 33 additions & 25 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma warning disable 0162

using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace FASTER.core
Expand Down Expand Up @@ -34,14 +35,10 @@ internal ClientSession(
this.fht = fht;
this.ctx = ctx;
this.supportAsync = supportAsync;
if (supportAsync)
{
fht.UseRelaxedCPR();
}
else
{

// Session runs on a single thread
if (!supportAsync)
UnsafeResumeThread();
}
}

/// <summary>
Expand All @@ -55,24 +52,10 @@ internal ClientSession(
public void Dispose()
{
CompletePending(true);
fht.DisposeClientSession(ID);
}

/// <summary>
/// Resume session on current thread
/// Call SuspendThread before any async op
/// </summary>
internal void UnsafeResumeThread()
{
fht.ResumeSession(ctx);
}

/// <summary>
/// Suspend session on current thread
/// </summary>
internal void UnsafeSuspendThread()
{
fht.SuspendSession();
// Session runs on a single thread
if (!supportAsync)
UnsafeSuspendThread();
}

/// <summary>
Expand All @@ -84,6 +67,7 @@ internal void UnsafeSuspendThread()
/// <param name="userContext"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext, long serialNo)
{
if (supportAsync) UnsafeResumeThread();
Expand All @@ -100,6 +84,7 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
/// <param name="userContext"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, long serialNo)
{
if (supportAsync) UnsafeResumeThread();
Expand All @@ -116,6 +101,7 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, l
/// <param name="userContext"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status RMW(ref Key key, ref Input input, Context userContext, long serialNo)
{
if (supportAsync) UnsafeResumeThread();
Expand All @@ -131,6 +117,7 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long serial
/// <param name="userContext"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Delete(ref Key key, Context userContext, long serialNo)
{
if (supportAsync) UnsafeResumeThread();
Expand Down Expand Up @@ -189,10 +176,31 @@ public bool CompleteCheckpoint(bool spinWait = false)
/// Complete the ongoing checkpoint (if any)
/// </summary>
/// <returns></returns>
internal async ValueTask CompleteCheckpointAsync()
public async ValueTask CompleteCheckpointAsync()
{
if (!supportAsync) throw new NotSupportedException();
await fht.CompleteCheckpointAsync(ctx, this);
}

/// <summary>
/// Resume session on current thread
/// Call SuspendThread before any async op
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void UnsafeResumeThread()
{
fht.epoch.Acquire();
fht.InternalRefresh(ctx);
}

/// <summary>
/// Suspend session on current thread
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void UnsafeSuspendThread()
{
fht.epoch.Release();
}

}
}
2 changes: 0 additions & 2 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ internal async ValueTask CompletePendingAsync(ClientSession<Key, Value, Input, O

if (clientSession.ctx.prevCtx.retryRequests.Count > 0)
{
clientSession.UnsafeResumeThread();
CompleteRetryRequests(clientSession.ctx.prevCtx, clientSession.ctx, clientSession);
clientSession.UnsafeSuspendThread();
}

done &= (clientSession.ctx.prevCtx.ioPendingRequests.Count == 0);
Expand Down
25 changes: 6 additions & 19 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functio
/// <returns></returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientSession(bool supportAsync = true)
{
if (supportAsync)
UseRelaxedCPR();

Guid guid = Guid.NewGuid();
var ctx = new FasterExecutionContext();
InitContext(ctx, guid);
Expand Down Expand Up @@ -52,6 +55,9 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientS
/// <returns></returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> ContinueClientSession(Guid guid, out CommitPoint cp, bool supportAsync = true)
{
if (supportAsync)
UseRelaxedCPR();

cp = InternalContinue(guid, out FasterExecutionContext ctx);
if (cp.UntilSerialNo == -1)
throw new Exception($"Unable to find session {guid} to recover");
Expand All @@ -74,25 +80,6 @@ internal void DisposeClientSession(Guid guid)
{
lock (_activeSessions)
_activeSessions.Remove(guid);
epoch.Release();
}

/// <summary>
/// Resume session with FASTER
/// </summary>
/// <param name="ctx">Context</param>
internal void ResumeSession(FasterExecutionContext ctx)
{
epoch.Resume();
InternalRefresh(ctx);
}

/// <summary>
/// Suspend session with FASTER
/// </summary>
internal void SuspendSession()
{
epoch.Suspend();
}
}
}

0 comments on commit 089d545

Please sign in to comment.