Skip to content

Commit

Permalink
Removing statics from codebase (#117)
Browse files Browse the repository at this point in the history
* Making LightEpoch instance specific
* Created thread + instance based thread local variable mechanism
* Fixed clean up in testcases
* Close memory stream after calling EndSerialize
* Making DumpDistribution return a string instead of writing to console
* Removed redundant EndSerialize
* Do not share buffer pools across instances.
* Remove extra buffer copy
* Fixing issue where LightEpoch.IsProtected dereferences FastThreadLocal.values for threads that haven't initialized values yet
* Fixes to addresses, cleanup object read logic
* Fix addresses and ranges for reading objects
* Do not spin on read when queue is full, if we are reading from unprotected IO thread
* Changed SpinWait to Yield for async reads
  • Loading branch information
badrishc committed Apr 5, 2019
1 parent 0b4b88c commit 7517324
Show file tree
Hide file tree
Showing 24 changed files with 525 additions and 433 deletions.
2 changes: 1 addition & 1 deletion cs/benchmark/FasterYcsbBenchmark.cs
Expand Up @@ -248,7 +248,7 @@ public unsafe void Run()


idx_ = 0;
store.DumpDistribution();
Console.WriteLine(store.DumpDistribution());

Console.WriteLine("Executing experiment.");

Expand Down
2 changes: 0 additions & 2 deletions cs/playground/SumStore/RecoveryTest.cs
Expand Up @@ -19,15 +19,13 @@ class RecoveryTest
const long completePendingInterval = 1 << 12;
const int checkpointInterval = 10 * 1000;
readonly int threadCount;
readonly int numActiveThreads;
FasterKV<AdId, NumClicks, Input, Output, Empty, Functions> fht;

BlockingCollection<Input[]> inputArrays;

public RecoveryTest(int threadCount)
{
this.threadCount = threadCount;
numActiveThreads = 0;

// Create FASTER index
var log = Devices.CreateLogDevice("logs\\hlog");
Expand Down
81 changes: 63 additions & 18 deletions cs/src/core/Allocator/AllocatorBase.cs
Expand Up @@ -60,7 +60,8 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// <summary>
/// Epoch information
/// </summary>
protected LightEpoch epoch;
protected readonly LightEpoch epoch;
private readonly bool ownedEpoch;

/// <summary>
/// Comparer
Expand Down Expand Up @@ -201,13 +202,13 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// <summary>
/// Number of pending reads
/// </summary>
private static int numPendingReads = 0;
private int numPendingReads = 0;
#endregion

/// <summary>
/// Read buffer pool
/// Buffer pool
/// </summary>
protected SectorAlignedBufferPool readBufferPool;
protected SectorAlignedBufferPool bufferPool;

/// <summary>
/// Read cache
Expand Down Expand Up @@ -415,8 +416,9 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// <param name="settings"></param>
/// <param name="comparer"></param>
/// <param name="evictCallback"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback)
: this(settings, comparer)
/// <param name="epoch"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback, LightEpoch epoch)
: this(settings, comparer, epoch)
{
if (evictCallback != null)
{
Expand All @@ -430,9 +432,18 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
/// </summary>
/// <param name="settings"></param>
/// <param name="comparer"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer)
/// <param name="epoch"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, LightEpoch epoch)
{
this.comparer = comparer;
if (epoch == null)
{
this.epoch = new LightEpoch();
ownedEpoch = true;
}
else
this.epoch = epoch;

settings.LogDevice.Initialize(1L << settings.SegmentSizeBits);
settings.ObjectLogDevice?.Initialize(1L << settings.SegmentSizeBits);

Expand Down Expand Up @@ -481,7 +492,7 @@ protected void Initialize(long firstValidAddress)
Debug.Assert(firstValidAddress <= PageSize);
Debug.Assert(PageSize >= GetRecordSize(0));

readBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize);
bufferPool = new SectorAlignedBufferPool(1, sectorSize);

long tailPage = firstValidAddress >> LogPageSizeBits;
int tailPageIndex = (int)(tailPage % BufferSize);
Expand All @@ -504,6 +515,24 @@ protected void Initialize(long firstValidAddress)
TailPageIndex = 0;
}

/// <summary>
/// Acquire thread
/// </summary>
public void Acquire()
{
if (ownedEpoch)
epoch.Acquire();
}

/// <summary>
/// Release thread
/// </summary>
public void Release()
{
if (ownedEpoch)
epoch.Release();
}

/// <summary>
/// Dispose allocator
/// </summary>
Expand All @@ -519,6 +548,10 @@ public virtual void Dispose()
SafeHeadAddress = 0;
HeadAddress = 0;
BeginAddress = 1;

if (ownedEpoch)
epoch.Dispose();
bufferPool.Free();
}

/// <summary>
Expand Down Expand Up @@ -1120,7 +1153,7 @@ internal void AsyncReadRecordToMemory(long fromLogical, int numBytes, IOCompleti
uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset);
alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1));

var record = readBufferPool.Get((int)alignedReadLength);
var record = bufferPool.Get((int)alignedReadLength);
record.valid_offset = (int)(fileOffset - alignedFileOffset);
record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset));
record.required_bytes = numBytes;
Expand All @@ -1141,6 +1174,7 @@ internal void AsyncReadRecordToMemory(long fromLogical, int numBytes, IOCompleti
/// <typeparam name="TContext"></typeparam>
/// <param name="readPageStart"></param>
/// <param name="numPages"></param>
/// <param name="untilAddress"></param>
/// <param name="callback"></param>
/// <param name="context"></param>
/// <param name="devicePageOffset"></param>
Expand All @@ -1149,12 +1183,13 @@ internal void AsyncReadRecordToMemory(long fromLogical, int numBytes, IOCompleti
public void AsyncReadPagesFromDevice<TContext>(
long readPageStart,
int numPages,
long untilAddress,
IOCompletionCallback callback,
TContext context,
long devicePageOffset = 0,
IDevice logDevice = null, IDevice objectLogDevice = null)
{
AsyncReadPagesFromDevice(readPageStart, numPages, callback, context,
AsyncReadPagesFromDevice(readPageStart, numPages, untilAddress, callback, context,
out CountdownEvent completed, devicePageOffset, logDevice, objectLogDevice);
}

Expand All @@ -1164,6 +1199,7 @@ internal void AsyncReadRecordToMemory(long fromLogical, int numBytes, IOCompleti
/// <typeparam name="TContext"></typeparam>
/// <param name="readPageStart"></param>
/// <param name="numPages"></param>
/// <param name="untilAddress"></param>
/// <param name="callback"></param>
/// <param name="context"></param>
/// <param name="completed"></param>
Expand All @@ -1173,6 +1209,7 @@ internal void AsyncReadRecordToMemory(long fromLogical, int numBytes, IOCompleti
private void AsyncReadPagesFromDevice<TContext>(
long readPageStart,
int numPages,
long untilAddress,
IOCompletionCallback callback,
TContext context,
out CountdownEvent completed,
Expand Down Expand Up @@ -1205,15 +1242,24 @@ internal void AsyncReadRecordToMemory(long fromLogical, int numBytes, IOCompleti
page = readPage,
context = context,
handle = completed,
count = 1
maxPtr = PageSize
};

ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage);
uint readLength = (uint)AlignedPageSizeBytes;
long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask));

if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize))
{
readLength = (uint)(adjustedUntilAddress - (long)offsetInFile);
asyncResult.maxPtr = readLength;
readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1));
}

if (device != null)
offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset));

ReadAsync(offsetInFile, pageIndex, (uint)PageSize, callback, asyncResult, usedDevice, usedObjlogDevice);
ReadAsync(offsetInFile, pageIndex, readLength, callback, asyncResult, usedDevice, usedObjlogDevice);
}
}

Expand Down Expand Up @@ -1361,14 +1407,13 @@ public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogica
AsyncIOContext<Key, Value> context,
SectorAlignedMemory result = default(SectorAlignedMemory))
{
while (numPendingReads > 120)
if (epoch.IsProtected()) // Do not spin for unprotected IO threads
{
Thread.SpinWait(100);

// Do not protect if we are not already protected
// E.g., we are in an IO thread
if (epoch.IsProtected())
while (numPendingReads > 120)
{
Thread.Yield();
epoch.ProtectAndDrain();
}
}
Interlocked.Increment(ref numPendingReads);

Expand Down
20 changes: 14 additions & 6 deletions cs/src/core/Allocator/BlittableAllocator.cs
Expand Up @@ -31,15 +31,13 @@ public unsafe sealed class BlittableAllocator<Key, Value> : AllocatorBase<Key, V
private static readonly int keySize = Utility.GetSize(default(Key));
private static readonly int valueSize = Utility.GetSize(default(Value));

public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null)
: base(settings, comparer, evictCallback)
public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null, LightEpoch epoch = null)
: base(settings, comparer, evictCallback, epoch)
{
values = new byte[BufferSize][];
handles = new GCHandle[BufferSize];
pointers = new long[BufferSize];

epoch = LightEpoch.Instance;

ptrHandle = GCHandle.Alloc(pointers, GCHandleType.Pinned);
nativePointers = (long*)ptrHandle.AddrOfPinnedObject();
}
Expand Down Expand Up @@ -333,6 +331,7 @@ internal override void PopulatePage(byte* src, int required_bytes, long destinat
/// <typeparam name="TContext"></typeparam>
/// <param name="readPageStart"></param>
/// <param name="numPages"></param>
/// <param name="untilAddress"></param>
/// <param name="callback"></param>
/// <param name="context"></param>
/// <param name="frame"></param>
Expand All @@ -343,6 +342,7 @@ internal override void PopulatePage(byte* src, int required_bytes, long destinat
internal void AsyncReadPagesFromDeviceToFrame<TContext>(
long readPageStart,
int numPages,
long untilAddress,
IOCompletionCallback callback,
TContext context,
BlittableFrame frame,
Expand Down Expand Up @@ -375,16 +375,24 @@ internal override void PopulatePage(byte* src, int required_bytes, long destinat
page = readPage,
context = context,
handle = completed,
count = 1,
frame = frame
};

ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage);

uint readLength = (uint)AlignedPageSizeBytes;
long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask));

if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize))
{
readLength = (uint)(adjustedUntilAddress - (long)offsetInFile);
readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1));
}

if (device != null)
offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset));

usedDevice.ReadAsync(offsetInFile, (IntPtr)frame.pointers[pageIndex], (uint)AlignedPageSizeBytes, callback, asyncResult);
usedDevice.ReadAsync(offsetInFile, (IntPtr)frame.pointers[pageIndex], readLength, callback, asyncResult);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions cs/src/core/Allocator/BlittableScanIterator.cs
Expand Up @@ -61,7 +61,7 @@ public unsafe BlittableScanIterator(BlittableAllocator<Key, Value> hlog, long be
var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize;
hlog.AsyncReadPagesFromDeviceToFrame
(nextAddress >> hlog.LogPageSizeBits,
1, AsyncReadPagesCallback, Empty.Default,
1, endAddress, AsyncReadPagesCallback, Empty.Default,
frame, out loaded[frameNumber]);
}
}
Expand Down Expand Up @@ -160,7 +160,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu
{
if (!first)
{
hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
}
}
else
Expand All @@ -169,7 +169,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu
if ((endPage > currentPage) &&
((endPage > currentPage + 1) || ((endAddress & hlog.PageSizeMask) != 0)))
{
hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]);
hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]);
}
}
first = false;
Expand Down

0 comments on commit 7517324

Please sign in to comment.