Skip to content

Commit

Permalink
Cleaned up epochs, improved fine grain scalability.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 18, 2019
1 parent 853b3ea commit 6315a14
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 59 deletions.
8 changes: 2 additions & 6 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using FASTER.core.log;
using System;
using System.Diagnostics;
using System.Diagnostics.Eventing.Reader;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using FASTER.core;

namespace FasterLogSample
{
public class Program
{
const int entryLength = 100;
const int entryLength = 96;
static FasterLog log;

static void ReportThread()
Expand Down
86 changes: 44 additions & 42 deletions cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@

namespace FASTER.core
{

/// <summary>
///
/// Epoch protection
/// </summary>
public unsafe class LightEpoch
{
Expand All @@ -37,18 +36,17 @@ public unsafe class LightEpoch
private GCHandle tableHandle;
private Entry* tableAligned;

private static Entry[] threadIndex;
private static GCHandle threadIndexHandle;
private static Entry* threadIndexAligned;

/// <summary>
/// List of action, epoch pairs containing actions to performed
/// when an epoch becomes safe to reclaim.
/// </summary>
private int drainCount = 0;
private readonly EpochActionPair[] drainList = new EpochActionPair[kDrainListSize];

/// <summary>
/// Number of entries in the epoch table
/// </summary>
private int numEntries;

/// <summary>
/// A thread's entry in the epoch table.
/// </summary>
Expand All @@ -61,6 +59,9 @@ public unsafe class LightEpoch
[ThreadStatic]
private static int threadEntryIndexCount;

[ThreadStatic]
static int threadId;

/// <summary>
/// Global current epoch value
/// </summary>
Expand All @@ -72,25 +73,28 @@ public unsafe class LightEpoch
public int SafeToReclaimEpoch;

/// <summary>
/// Instantiate the epoch table
/// Static constructor to setup shared cache-aligned space
/// to store per-entry count of instances using that entry
/// </summary>
/// <param name="size"></param>
public LightEpoch(int size = kTableSize)
static LightEpoch()
{
Initialize(size);
// Over-allocate to do cache-line alignment
threadIndex = new Entry[kTableSize + 2];
threadIndexHandle = GCHandle.Alloc(threadIndex, GCHandleType.Pinned);
long p = (long)threadIndexHandle.AddrOfPinnedObject();

// Force the pointer to align to 64-byte boundaries
long p2 = (p + (Constants.kCacheLineBytes - 1)) & ~(Constants.kCacheLineBytes - 1);
threadIndexAligned = (Entry*)p2;
}

/// <summary>
/// Initialize the epoch table
/// Instantiate the epoch table
/// </summary>
/// <param name="size"></param>
unsafe void Initialize(int size)
public LightEpoch()
{
// threadEntryIndex = new FastThreadLocal<int>();
numEntries = size;

// Over-allocate to do cache-line alignment
tableRaw = new Entry[size + 2];
tableRaw = new Entry[kTableSize + 2];
tableHandle = GCHandle.Alloc(tableRaw, GCHandleType.Pinned);
long p = (long)tableHandle.AddrOfPinnedObject();

Expand All @@ -114,12 +118,8 @@ public void Dispose()
tableHandle.Free();
tableAligned = null;
tableRaw = null;

numEntries = 0;
CurrentEpoch = 1;
SafeToReclaimEpoch = 0;

// threadEntryIndex.Dispose();
}

/// <summary>
Expand All @@ -140,6 +140,7 @@ public int ProtectAndDrain()
{
int entry = threadEntryIndex;

(*(tableAligned + entry)).threadId = threadEntryIndex;
(*(tableAligned + entry)).localCurrentEpoch = CurrentEpoch;

if (drainCount > 0)
Expand Down Expand Up @@ -180,6 +181,7 @@ private void Drain(int nextEpoch)
/// <summary>
/// Thread acquires its epoch entry
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Acquire()
{
if (threadEntryIndex == kInvalidIndex)
Expand All @@ -191,20 +193,18 @@ public void Acquire()
/// <summary>
/// Thread releases its epoch entry
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Release()
{
int entry = threadEntryIndex;
if (kInvalidIndex == entry)
{
return;
}
(*(tableAligned + entry)).localCurrentEpoch = 0;
(*(tableAligned + entry)).threadId = 0;

threadEntryIndexCount--;
if (threadEntryIndexCount == 0)
{
(threadIndexAligned + threadEntryIndex)->threadId = 0;
threadEntryIndex = kInvalidIndex;
(*(tableAligned + entry)).localCurrentEpoch = 0;
(*(tableAligned + entry)).threadId = 0;
}
}

Expand All @@ -214,7 +214,7 @@ public void Release()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Suspend()
{
(*(tableAligned + threadEntryIndex)).localCurrentEpoch = int.MaxValue;
Release();
}

/// <summary>
Expand All @@ -223,8 +223,7 @@ public void Suspend()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Resume()
{
if (threadEntryIndex == kInvalidIndex)
Acquire();
Acquire();
ProtectAndDrain();
}

Expand Down Expand Up @@ -307,7 +306,7 @@ private int ComputeNewSafeToReclaimEpoch(int currentEpoch)
{
int oldestOngoingCall = currentEpoch;

for (int index = 1; index <= numEntries; ++index)
for (int index = 1; index <= kTableSize; ++index)
{
int entry_epoch = (*(tableAligned + index)).localCurrentEpoch;
if (0 != entry_epoch)
Expand All @@ -332,20 +331,20 @@ private int ComputeNewSafeToReclaimEpoch(int currentEpoch)
/// <param name="startIndex">Start index</param>
/// <param name="threadId">Thread id</param>
/// <returns>Reserved entry</returns>
private int ReserveEntry(int startIndex, int threadId)
private static int ReserveEntry(int startIndex, int threadId)
{
int current_iteration = 0;
for (; ; )
{
// Reserve an entry in the table.
for (int i = 0; i < numEntries; ++i)
for (int i = 0; i < kTableSize; ++i)
{
int index_to_test = 1 + ((startIndex + i) & (numEntries - 1));
if (0 == (*(tableAligned + index_to_test)).threadId)
int index_to_test = 1 + ((startIndex + i) & (kTableSize - 1));
if (0 == (threadIndexAligned + index_to_test)->threadId)
{
bool success =
(0 == Interlocked.CompareExchange(
ref (*(tableAligned + index_to_test)).threadId,
ref (threadIndexAligned+index_to_test)->threadId,
threadId, 0));

if (success)
Expand All @@ -356,7 +355,7 @@ private int ReserveEntry(int startIndex, int threadId)
++current_iteration;
}

if (current_iteration > (numEntries * 3))
if (current_iteration > (kTableSize * 10))
{
throw new Exception("Unable to reserve an epoch entry, try increasing the epoch table size (kTableSize)");
}
Expand All @@ -368,10 +367,13 @@ private int ReserveEntry(int startIndex, int threadId)
/// once for a thread.
/// </summary>
/// <returns>Reserved entry</returns>
private int ReserveEntryForThread()
private static int ReserveEntryForThread()
{
// for portability(run on non-windows platform)
int threadId = Environment.OSVersion.Platform == PlatformID.Win32NT ? (int)Native32.GetCurrentThreadId() : Thread.CurrentThread.ManagedThreadId;
if (threadId == 0) // run once per thread for performance
{
// For portability(run on non-windows platform)
threadId = Environment.OSVersion.Platform == PlatformID.Win32NT ? (int)Native32.GetCurrentThreadId() : Thread.CurrentThread.ManagedThreadId;
}
int startIndex = Utility.Murmur3(threadId);
return ReserveEntry(startIndex, threadId);
}
Expand Down Expand Up @@ -429,7 +431,7 @@ public bool MarkAndCheckIsComplete(int markerIdx, int version)
(*(tableAligned + entry)).markers[markerIdx] = version;

// check if all threads have reported complete
for (int index = 1; index <= numEntries; ++index)
for (int index = 1; index <= kTableSize; ++index)
{
int entry_epoch = (*(tableAligned + index)).localCurrentEpoch;
int fc_version = (*(tableAligned + index)).markers[markerIdx];
Expand Down
33 changes: 26 additions & 7 deletions cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using FASTER.core;

namespace FASTER.core.log
namespace FASTER.core
{

/// <summary>
/// FASTER log
/// </summary>
public class FasterLog
public class FasterLog : IDisposable
{
private readonly BlittableAllocator<Empty, byte> allocator;
private readonly LightEpoch epoch;
Expand All @@ -43,11 +42,20 @@ public class FasterLog
/// <param name="logSettings"></param>
public FasterLog(FasterLogSettings logSettings)
{
this.epoch = new LightEpoch();
epoch = new LightEpoch();
allocator = new BlittableAllocator<Empty, byte>(logSettings.GetLogSettings(), null, null, epoch);
allocator.Initialize();
}

/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
allocator.Dispose();
epoch.Dispose();
}

/// <summary>
/// Append entry to log
/// </summary>
Expand Down Expand Up @@ -146,10 +154,21 @@ public FasterLogScanIterator Scan(long beginAddress, long endAddress, ScanBuffer
}

/// <summary>
/// Dispose this thread's epoch entry. Use when you manage your own
/// threads and want to recycle a thread-local epoch entry.
/// Create and pin epoch entry for this thread - use with ReleaseThread
/// if you manage the thread.
/// DO NOT USE WITH ASYNC CODE
/// </summary>
public void AcquireThread()
{
epoch.Acquire();
}

/// <summary>
/// Dispose epoch entry for this thread. Use with AcquireThread
/// if you manage the thread.
/// DO NOT USE WITH ASYNC CODE
/// </summary>
public void DisposeThread()
public void ReleaseThread()
{
epoch.Release();
}
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Threading;
using System.Diagnostics;

namespace FASTER.core.log
namespace FASTER.core
{
/// <summary>
/// Scan iterator for hybrid log
Expand Down Expand Up @@ -36,7 +36,7 @@ public class FasterLogScanIterator : IDisposable
/// <param name="endAddress"></param>
/// <param name="scanBufferingMode"></param>
/// <param name="epoch"></param>
public unsafe FasterLogScanIterator(BlittableAllocator<Empty, byte> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch)
internal unsafe FasterLogScanIterator(BlittableAllocator<Empty, byte> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch)
{
this.allocator = hlog;
this.epoch = epoch;
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/FasterLog/FasterLogSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#pragma warning disable 0162

namespace FASTER.core.log
namespace FASTER.core
{
/// <summary>
/// FASTER Log Settings
Expand Down
Loading

0 comments on commit 6315a14

Please sign in to comment.