Skip to content

Commit

Permalink
[WIP] Read cache for FASTER (#92)
Browse files Browse the repository at this point in the history
* Added support for best effort delete, i.e., clean up hash entries if possible + delete value if it is in the mutable region. This is an experimental feature - do not take dependency yet. Use it by calling fht.DeleteFromMutable(...).
* Added support for a read cache, which is another instance of the "log" allocator that is used to cache read-only data. Eviction is based on the hybrid log idea -- have a "eviction region" (reusing tyhe hybrid log read-only region), where records have a second chance to go back to the tail, before they are evicted entirely from the cache.
* Current limitations of read cache: (1) RMW operations are not served from the cache, but instead retrieve the record from storage; (2) checkpointing of the hash index is not adjusted to eliminate pointers to read cache.
* Fixed race condition on page flush status, fix broken reads in AnyCPU config, other bug fixes, improved test coverage. Updated ClassCache sample.
* Separated log-related operations into its own LogAccessor object, accessible from FasterKv.
* Ensure ReadOnlyLagAddress does not exceed HeadOffsetLagAddress (#108).
  • Loading branch information
badrishc committed Mar 12, 2019
1 parent c73e99b commit e664f0d
Show file tree
Hide file tree
Showing 27 changed files with 1,356 additions and 238 deletions.
4 changes: 2 additions & 2 deletions cs/benchmark/FasterYcsbBenchmark.cs
Expand Up @@ -243,7 +243,7 @@ public unsafe void Run()
sw.Stop();
Console.WriteLine("Loading time: {0}ms", sw.ElapsedMilliseconds);

long startTailAddress = store.LogTailAddress;
long startTailAddress = store.Log.TailAddress;
Console.WriteLine("Start tail address = " + startTailAddress);


Expand Down Expand Up @@ -296,7 +296,7 @@ public unsafe void Run()
#endif

double seconds = swatch.ElapsedMilliseconds / 1000.0;
long endTailAddress = store.LogTailAddress;
long endTailAddress = store.Log.TailAddress;
Console.WriteLine("End tail address = " + endTailAddress);

Console.WriteLine("Total " + total_ops_done + " ops done " + " in " + seconds + " secs.");
Expand Down
35 changes: 27 additions & 8 deletions cs/playground/ClassCache/Program.cs
Expand Up @@ -14,16 +14,26 @@ namespace ClassCache
{
class Program
{
// Whether we use reade cache in this sample
static readonly bool useReadCache = true;

static void Main(string[] args)
{
var context = default(CacheContext);

var log = Devices.CreateLogDevice(Path.GetTempPath() + "hlog.log", deleteOnClose: true);
var objlog = Devices.CreateLogDevice(Path.GetTempPath() + "hlog.obj.log", deleteOnClose: true);

var logSettings = new LogSettings { LogDevice = log, ObjectLogDevice = objlog };

if (useReadCache)
{
logSettings.ReadCacheSettings = new ReadCacheSettings();
}

var h = new FasterKV
<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions>(
1L << 20, new CacheFunctions(),
new LogSettings { LogDevice = log, ObjectLogDevice = objlog },
1L << 20, new CacheFunctions(), logSettings,
null,
new SerializerSettings<CacheKey, CacheValue> { keySerializer = () => new CacheKeySerializer(), valueSerializer = () => new CacheValueSerializer() }
);
Expand Down Expand Up @@ -54,12 +64,18 @@ static void Main(string[] args)
sw.Stop();
Console.WriteLine("Total time to upsert {0} elements: {1:0.000} secs ({2:0.00} inserts/sec)", max, sw.ElapsedMilliseconds/1000.0, max / (sw.ElapsedMilliseconds / 1000.0));

// Uncomment below to copy entire log to disk
// h.ShiftReadOnlyAddress(h.LogTailAddress);
// Uncomment below to copy entire log to disk, but retain tail of log in memory
// h.Log.Flush(true);

// Uncomment below to move entire log to disk
// and eliminate data from memory as well
// h.ShiftHeadAddress(h.LogTailAddress, true);
// Uncomment below to move entire log to disk and eliminate data from memory as
// well. This will serve workload entirely from disk using read cache if enabled.
// This will *allow* future updates to the store.
// h.Log.FlushAndEvict(true);

// Uncomment below to move entire log to disk and eliminate data from memory as
// well. This will serve workload entirely from disk using read cache if enabled.
// This will *prevent* future updates to the store.
h.Log.DisposeFromMemory();

Console.Write("Enter read workload type (0 = random reads; 1 = interactive): ");
var workload = int.Parse(Console.ReadLine());
Expand Down Expand Up @@ -96,7 +112,10 @@ private static void RandomReadWorkload(FasterKV<CacheKey, CacheValue, CacheInput
switch (status)
{
case Status.PENDING:
statusPending++; break;
statusPending++;
if (statusPending % 1000 == 0)
h.CompletePending(false);
break;
case Status.OK:
if (output.value.value != key.key)
throw new Exception("Read error!");
Expand Down
1 change: 0 additions & 1 deletion cs/playground/StructSampleCore/Program.cs
Expand Up @@ -30,7 +30,6 @@ static void Sample1()

fht.StartSession();


fht.Upsert(ref key, ref value, Empty.Default, 0);
fht.Read(ref key, ref input, ref output, Empty.Default, 0);
if (output == value)
Expand Down
82 changes: 47 additions & 35 deletions cs/src/core/Allocator/AllocatorBase.cs
Expand Up @@ -17,9 +17,12 @@ internal enum PMMFlushStatus : int { Flushed, InProgress };

internal enum PMMCloseStatus : int { Closed, Open };

[StructLayout(LayoutKind.Explicit)]
internal struct FullPageStatus
{
[FieldOffset(0)]
public long LastFlushedUntilAddress;
[FieldOffset(8)]
public FlushCloseStatus PageFlushCloseStatus;
}

Expand Down Expand Up @@ -206,6 +209,16 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// </summary>
protected SectorAlignedBufferPool readBufferPool;

/// <summary>
/// Read cache
/// </summary>
protected readonly bool ReadCache = false;

/// <summary>
/// Read cache eviction callback
/// </summary>
protected readonly Action<long, long> EvictCallback = null;

#region Abstract methods
/// <summary>
/// Initialize
Expand Down Expand Up @@ -389,6 +402,23 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable

#endregion


/// <summary>
///
/// </summary>
/// <param name="settings"></param>
/// <param name="comparer"></param>
/// <param name="evictCallback"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback)
: this(settings, comparer)
{
if (evictCallback != null)
{
ReadCache = true;
EvictCallback = evictCallback;
}
}

/// <summary>
/// Instantiate base allocator
/// </summary>
Expand All @@ -415,9 +445,9 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
HeadOffsetLagSize = BufferSize - HeadOffsetLagNumPages;
HeadOffsetLagAddress = (long)HeadOffsetLagSize << LogPageSizeBits;

// ReadOnlyOffset lag (from tail)
// ReadOnlyOffset lag (from tail). This should not exceed HeadOffset lag.
LogMutableFraction = settings.MutableFraction;
ReadOnlyLagAddress = (long)(LogMutableFraction * BufferSize) << LogPageSizeBits;
ReadOnlyLagAddress = Math.Min((long)(LogMutableFraction * BufferSize) << LogPageSizeBits, HeadOffsetLagAddress);

// Segment size
LogSegmentSizeBits = settings.SegmentSizeBits;
Expand Down Expand Up @@ -485,6 +515,11 @@ public virtual void Dispose()
BeginAddress = 1;
}

/// <summary>
/// Delete in-memory portion of the log
/// </summary>
internal abstract void DeleteFromMemory();

/// <summary>
/// Segment size
/// </summary>
Expand Down Expand Up @@ -927,6 +962,9 @@ private void PageAlignedShiftHeadAddress(long currentTailAddress)
}
newHeadAddress = newHeadAddress & ~PageSizeMask;

if (ReadCache && (newHeadAddress > HeadAddress))
EvictCallback(HeadAddress, newHeadAddress);

if (MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress))
{
Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress);
Expand All @@ -949,40 +987,15 @@ public long ShiftHeadAddress(long desiredHeadAddress)
newHeadAddress = currentFlushedUntilAddress;
}

if (MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress))
{
Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress);
epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress));
return newHeadAddress;
}
return oldHeadAddress;
}

/// <summary>
/// Called whenever a new tail page is allocated or when the user is checking for a failed memory allocation
/// Tries to shift head address based on the head offset lag size.
/// </summary>
/// <param name="desiredHeadAddress"></param>
private void PageAlignedShiftHeadAddressToValue(long desiredHeadAddress)
{
//obtain local values of variables that can change
long currentHeadAddress = HeadAddress;
long currentFlushedUntilAddress = FlushedUntilAddress;

desiredHeadAddress = desiredHeadAddress & ~PageSizeMask;

long newHeadAddress = desiredHeadAddress;
if (currentFlushedUntilAddress < newHeadAddress)
{
newHeadAddress = currentFlushedUntilAddress;
}
newHeadAddress = newHeadAddress & ~PageSizeMask;
if (ReadCache && (newHeadAddress > HeadAddress))
EvictCallback(HeadAddress, newHeadAddress);

if (MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress))
{
Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress);
epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress));
}
return newHeadAddress;
}

/// <summary>
Expand All @@ -995,13 +1008,13 @@ protected void ShiftFlushedUntilAddress()
long page = GetPage(currentFlushedUntilAddress);

bool update = false;
long pageLastFlushedAddress = PageStatusIndicator[(int)(page % BufferSize)].LastFlushedUntilAddress;
long pageLastFlushedAddress = Interlocked.Read(ref PageStatusIndicator[(int)(page % BufferSize)].LastFlushedUntilAddress);
while (pageLastFlushedAddress >= currentFlushedUntilAddress)
{
currentFlushedUntilAddress = pageLastFlushedAddress;
update = true;
page++;
pageLastFlushedAddress = PageStatusIndicator[(int)(page % BufferSize)].LastFlushedUntilAddress;
pageLastFlushedAddress = Interlocked.Read(ref PageStatusIndicator[(int)(page % BufferSize)].LastFlushedUntilAddress);
}

if (update)
Expand Down Expand Up @@ -1261,7 +1274,7 @@ public void AsyncFlushPages(long fromAddress, long untilAddress)
= new FlushCloseStatus { PageFlushStatus = PMMFlushStatus.InProgress, PageCloseStatus = PMMCloseStatus.Open };
}

PageStatusIndicator[flushPage % BufferSize].LastFlushedUntilAddress = -1;
Interlocked.Exchange(ref PageStatusIndicator[flushPage % BufferSize].LastFlushedUntilAddress, -1);

WriteAsync(flushPage, AsyncFlushPageCallback, asyncResult);
}
Expand Down Expand Up @@ -1430,8 +1443,6 @@ private void AsyncFlushPageCallback(uint errorCode, uint numBytes, NativeOverlap

if (Interlocked.Decrement(ref result.count) == 0)
{
PageStatusIndicator[result.page % BufferSize].LastFlushedUntilAddress = result.untilAddress;

if (!result.partial || (result.untilAddress >= ((result.page + 1) << LogPageSizeBits)))
{
while (true)
Expand All @@ -1449,6 +1460,7 @@ private void AsyncFlushPageCallback(uint errorCode, uint numBytes, NativeOverlap
}
}
}
Interlocked.Exchange(ref PageStatusIndicator[result.page % BufferSize].LastFlushedUntilAddress, result.untilAddress);
ShiftFlushedUntilAddress();
result.Free();
}
Expand Down
32 changes: 26 additions & 6 deletions cs/src/core/Allocator/BlittableAllocator.cs
Expand Up @@ -31,8 +31,8 @@ public unsafe sealed class BlittableAllocator<Key, Value> : AllocatorBase<Key, V
private static readonly int keySize = Utility.GetSize(default(Key));
private static readonly int valueSize = Utility.GetSize(default(Value));

public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer)
: base(settings, comparer)
public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null)
: base(settings, comparer, evictCallback)
{
values = new byte[BufferSize][];
handles = new GCHandle[BufferSize];
Expand Down Expand Up @@ -94,11 +94,14 @@ public override int GetRecordSize(ref Key key, ref Value value)
/// </summary>
public override void Dispose()
{
for (int i = 0; i < values.Length; i++)
if (values != null)
{
if (handles[i].IsAllocated)
handles[i].Free();
values[i] = null;
for (int i = 0; i < values.Length; i++)
{
if (handles[i].IsAllocated)
handles[i].Free();
values[i] = null;
}
}
handles = null;
pointers = null;
Expand Down Expand Up @@ -197,6 +200,23 @@ protected override void ClearPage(long page)
Array.Clear(values[page % BufferSize], 0, values[page % BufferSize].Length);
}

/// <summary>
/// Delete in-memory portion of the log
/// </summary>
internal override void DeleteFromMemory()
{
for (int i = 0; i < values.Length; i++)
{
if (handles[i].IsAllocated)
handles[i].Free();
values[i] = null;
}
handles = null;
pointers = null;
values = null;
}


private void WriteAsync<TContext>(IntPtr alignedSourceAddress, ulong alignedDestinationAddress, uint numBytesToWrite,
IOCompletionCallback callback, PageAsyncFlushResult<TContext> asyncResult,
IDevice device)
Expand Down

0 comments on commit e664f0d

Please sign in to comment.