Skip to content

Commit

Permalink
Actually checking in support for 1 page in memory, added initial draf…
Browse files Browse the repository at this point in the history
…t of disposing task
  • Loading branch information
badrishc committed Oct 7, 2019
1 parent 8e42a74 commit c55de3f
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 231 deletions.
208 changes: 16 additions & 192 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,8 @@ public unsafe abstract partial class AllocatorBase<Key, Value> : IDisposable
/// <summary>
/// HeadOffset lag (from tail)
/// </summary>
protected const int HeadOffsetLagNumPages = 2;
protected readonly bool HeadOffsetExtraLag;

/// <summary>
/// HeadOffset lag (from tail) for ReadCache
/// </summary>
protected const int ReadCacheHeadOffsetLagNumPages = 2;
/// <summary>
/// HeadOffset lag size
/// </summary>
protected readonly int HeadOffsetLagSize;
/// <summary>
/// HeadOFfset lag address
/// </summary>
Expand Down Expand Up @@ -181,10 +173,6 @@ public unsafe abstract partial class AllocatorBase<Key, Value> : IDisposable
#endregion

#region Private page metadata
/// <summary>
/// Index in circular buffer, of the current tail page
/// </summary>
private volatile int TailPageCache;

// Array that indicates the status of each buffer page
internal readonly FullPageStatus[] PageStatusIndicator;
Expand Down Expand Up @@ -503,8 +491,10 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
BufferSizeMask = BufferSize - 1;

// HeadOffset lag (from tail).
HeadOffsetLagSize = BufferSize - (ReadCache ? ReadCacheHeadOffsetLagNumPages : HeadOffsetLagNumPages);
HeadOffsetLagAddress = (long)HeadOffsetLagSize << LogPageSizeBits;
var headOffsetLagSize = BufferSize - 1; // (ReadCache ? ReadCacheHeadOffsetLagNumPages : HeadOffsetLagNumPages);
if (BufferSize > 1 && HeadOffsetExtraLag) headOffsetLagSize--;

HeadOffsetLagAddress = (long)headOffsetLagSize << LogPageSizeBits;

// ReadOnlyOffset lag (from tail). This should not exceed HeadOffset lag.
LogMutableFraction = settings.MutableFraction;
Expand All @@ -515,9 +505,12 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
SegmentSize = 1 << LogSegmentSizeBits;
SegmentBufferSize = 1 + (LogTotalSizeBytes / SegmentSize < 1 ? 1 : (int)(LogTotalSizeBytes / SegmentSize));

if (BufferSize < 4)
if (SegmentSize < PageSize)
throw new Exception("Segment must be at least of page size");

if (BufferSize < 1)
{
throw new Exception("HLOG buffer must be at least 4 pages");
throw new Exception("Log buffer must be of size at least 1 page");
}

PageStatusIndicator = new FullPageStatus[BufferSize];
Expand All @@ -542,8 +535,11 @@ protected void Initialize(long firstValidAddress)
AllocatePage(tailPageIndex);

// Allocate next page as well
if (firstValidAddress > 0)
AllocatePage(tailPageIndex + 1);
int nextPageIndex = (int)(tailPage + 1) % BufferSize;
if ((!IsAllocated(nextPageIndex)))
{
AllocatePage(nextPageIndex);
}

SafeReadOnlyAddress = firstValidAddress;
ReadOnlyAddress = firstValidAddress;
Expand All @@ -555,8 +551,6 @@ protected void Initialize(long firstValidAddress)

TailPageOffset.Page = (int)(firstValidAddress >> LogPageSizeBits);
TailPageOffset.Offset = (int)(firstValidAddress & PageSizeMask);

TailPageCache = 0;
}

/// <summary>
Expand Down Expand Up @@ -685,15 +679,6 @@ public long GetOffsetInPage(long address)
return address & PageSizeMask;
}

/// <summary>
/// Get offset lag in pages
/// </summary>
/// <returns></returns>
public long GetHeadOffsetLagInPages()
{
return HeadOffsetLagSize;
}

/// <summary>
/// Get sector size for main hlog device
/// </summary>
Expand All @@ -703,121 +688,6 @@ public int GetDeviceSectorSize()
return sectorSize;
}

/// <summary>
/// Key function used to allocate memory for a specified number of items
/// </summary>
/// <param name="numSlots"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long Allocate(int numSlots = 1)
{
PageOffset localTailPageOffset = default(PageOffset);

// Determine insertion index.
// ReSharper disable once CSharpWarnings::CS0420
#pragma warning disable 420
localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, numSlots);
#pragma warning restore 420

int page = localTailPageOffset.Page;
int offset = localTailPageOffset.Offset - numSlots;

#region HANDLE PAGE OVERFLOW
/* To prove correctness of the following modifications
* done to TailPageOffset and the allocation itself,
* we should use the fact that only one thread will have any
* of the following cases since it is a counter and we spin-wait
* until the tail is folded onto next page accordingly.
*/
if (localTailPageOffset.Offset >= PageSize)
{
if (offset >= PageSize)
{
//The tail offset value was more than page size before atomic add
//We consider that a failed attempt and retry again
var spin = new SpinWait();
do
{
//Just to give some more time to the thread
// that is handling this overflow
while (TailPageOffset.Offset >= PageSize)
{
spin.SpinOnce();
}

// ReSharper disable once CSharpWarnings::CS0420
#pragma warning disable 420
localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, numSlots);
#pragma warning restore 420

page = localTailPageOffset.Page;
offset = localTailPageOffset.Offset - numSlots;
} while (offset >= PageSize);
}


if (localTailPageOffset.Offset == PageSize)
{
//Folding over at page boundary
localTailPageOffset.Page++;
localTailPageOffset.Offset = 0;
TailPageOffset = localTailPageOffset;
}
else if (localTailPageOffset.Offset >= PageSize)
{
//Overflows not allowed. We allot same space in next page.
localTailPageOffset.Page++;
localTailPageOffset.Offset = numSlots;
TailPageOffset = localTailPageOffset;

page = localTailPageOffset.Page;
offset = 0;
}
}
#endregion

long address = (((long)page) << LogPageSizeBits) | ((long)offset);

// Check for TailPageCache hit
if (TailPageCache == page)
{
return (address);
}

// Negate the address if page not ready to be used
if (CannotAllocateNext(page))
{
address = -address;
}

// Update the read-only so that we can get more space for the tail
if (offset == 0)
{
if (address >= 0)
{
TailPageCache = page;
Interlocked.MemoryBarrier();
}

int newPageIndex = (page + 1) % BufferSize;
long tailAddress = (address < 0 ? -address : address);
PageAlignedShiftReadOnlyAddress(tailAddress);
PageAlignedShiftHeadAddress(tailAddress);

if ((!IsAllocated(newPageIndex)))
{
AllocatePage(newPageIndex);
}

// We refreshed epoch, so address may have
// become read-only; re-check
if (tailAddress < ReadOnlyAddress)
return Allocate(numSlots);
}

return address;
}

/// <summary>
/// Try allocate, no thread spinning allowed
/// May return 0 in case of inability to allocate
Expand Down Expand Up @@ -889,56 +759,12 @@ public long TryAllocate(int numSlots = 1)
return (((long)page) << LogPageSizeBits) | ((long)offset);
}

/// <summary>
/// If allocator cannot allocate new memory as the head has not shifted or the previous page
/// is not yet closed, it allocates but returns the negative address.
/// This function is invoked to check if the address previously allocated has become valid to be used
/// </summary>
/// <param name="address"></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void CheckForAllocateComplete(ref long address)
{
PageOffset p = default(PageOffset);
p.Page = (int)((-address) >> LogPageSizeBits);
p.Offset = (int)((-address) & PageSizeMask);

// Check write cache
if (TailPageCache == p.Page)
{
address = -address;
return;
}

PageAlignedShiftHeadAddress(GetTailAddress());

// Check if we can allocate pageIndex
if (CannotAllocateNext(p.Page))
{
return;
}

//correct values and set write cache
address = -address;
if (p.Offset == 0)
{
TailPageCache = p.Page;
}
return;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool CannotAllocateNext(int page)
{
return
(page >= BufferSize + (ClosedUntilAddress >> LogPageSizeBits) - 1) ||
!IsAllocated(page % BufferSize);
}

private bool CannotAllocate(int page)
{
return
(page >= BufferSize + (ClosedUntilAddress >> LogPageSizeBits));
}

/// <summary>
/// Used by applications to make the current state of the database immutable quickly
/// </summary>
Expand Down Expand Up @@ -1072,7 +898,6 @@ private void DebugPrintAddresses(long closePageAddress)
Console.WriteLine("SafeHead: {0}.{1}", GetPage(_safehead), GetOffsetInPage(_safehead));
Console.WriteLine("ReadOnly: {0}.{1}", GetPage(_readonly), GetOffsetInPage(_readonly));
Console.WriteLine("SafeReadOnly: {0}.{1}", GetPage(_safereadonly), GetOffsetInPage(_safereadonly));
Console.WriteLine("TailPageCache: {0}", TailPageCache);
}

/// <summary>
Expand Down Expand Up @@ -1213,7 +1038,6 @@ public void RecoveryReset(long tailAddress, long headAddress, long beginAddress)
long offsetInPage = GetOffsetInPage(tailAddress);
TailPageOffset.Page = (int)tailPage;
TailPageOffset.Offset = (int)offsetInPage;
TailPageCache = TailPageOffset.Page;

// allocate next page as well - this is an invariant in the allocator!
var pageIndex = (TailPageOffset.Page % BufferSize);
Expand Down
40 changes: 4 additions & 36 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1887,52 +1887,20 @@ private void HeavyEnter(long hash)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void BlockAllocate(int recordSize, out long logicalAddress)
{
logicalAddress = hlog.Allocate(recordSize);
if (logicalAddress >= 0) return;

while (logicalAddress < 0 && -logicalAddress >= hlog.ReadOnlyAddress)
while ((logicalAddress = hlog.TryAllocate(recordSize)) == 0)
{
InternalRefresh();
hlog.CheckForAllocateComplete(ref logicalAddress);
if (logicalAddress < 0)
{
Thread.Yield();
}
}

logicalAddress = logicalAddress < 0 ? -logicalAddress : logicalAddress;

if (logicalAddress < hlog.ReadOnlyAddress)
{
Debug.WriteLine("Allocated address is read-only, retrying");
BlockAllocate(recordSize, out logicalAddress);
Thread.Yield();
}
}


[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void BlockAllocateReadCache(int recordSize, out long logicalAddress)
{
logicalAddress = readcache.Allocate(recordSize);
if (logicalAddress >= 0)
return;

while (logicalAddress < 0 && -logicalAddress >= readcache.ReadOnlyAddress)
while ((logicalAddress = readcache.TryAllocate(recordSize)) == 0)
{
InternalRefresh();
readcache.CheckForAllocateComplete(ref logicalAddress);
if (logicalAddress < 0)
{
Thread.Yield();
}
}

logicalAddress = logicalAddress < 0 ? -logicalAddress : logicalAddress;

if (logicalAddress < readcache.ReadOnlyAddress)
{
Debug.WriteLine("Allocated address is read-only, retrying");
BlockAllocateReadCache(recordSize, out logicalAddress);
Thread.Yield();
}
}

Expand Down
4 changes: 3 additions & 1 deletion cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void Dispose()
{
allocator.Dispose();
epoch.Dispose();
commitTcs.TrySetException(new ObjectDisposedException("Log has been disposed"));
}

/// <summary>
Expand Down Expand Up @@ -399,7 +400,8 @@ private void Commit(long flushAddress)
}

_commitTask = commitTcs;
commitTcs = _newCommitTask;
if (commitTcs.Task.Status != TaskStatus.Faulted)
commitTcs = _newCommitTask;
}
_commitTask.SetResult(flushAddress);
}
Expand Down
6 changes: 4 additions & 2 deletions cs/src/core/Index/FasterLog/FasterLogSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ public class FasterLogSettings
public IDevice LogDevice = new NullDevice();

/// <summary>
/// Size of a segment (group of pages), in bits
/// Size of a page, in bits
/// </summary>
public int PageSizeBits = 22;

/// <summary>
/// Total size of in-memory part of log, in bits
/// Should be at least one page long
/// Num pages = 2^(MemorySizeBits-PageSizeBits)
/// </summary>
public int MemorySizeBits = 24;
public int MemorySizeBits = 23;

/// <summary>
/// Size of a segment (group of pages), in bits
/// This is the granularity of files on disk
/// </summary>
public int SegmentSizeBits = 30;

Expand Down

0 comments on commit c55de3f

Please sign in to comment.