Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 17, 2019
1 parent 3077f52 commit 853b3ea
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 75 deletions.
2 changes: 1 addition & 1 deletion cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static void ScanThread()
static void Main(string[] args)
{
var device = Devices.CreateLogDevice("D:\\logs\\hlog.log");
log = new FasterLog(new FasterLogSettings { LogDevice = device, MemorySizeBits = 29, PageSizeBits = 25 });
log = new FasterLog(new FasterLogSettings { LogDevice = device });

new Thread(new ThreadStart(AppendThread)).Start();
new Thread(new ThreadStart(ScanThread)).Start();
Expand Down
39 changes: 1 addition & 38 deletions cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,51 +13,14 @@

namespace FASTER.core.log
{
public class FasterLogSettings
{
/// <summary>
/// Device used for log
/// </summary>
public IDevice LogDevice = new NullDevice();

/// <summary>
/// Size of a segment (group of pages), in bits
/// </summary>
public int PageSizeBits = 22;

/// <summary>
/// Size of a segment (group of pages), in bits
/// </summary>
public int SegmentSizeBits = 30;

/// <summary>
/// Total size of in-memory part of log, in bits
/// </summary>
public int MemorySizeBits = 34;

internal LogSettings GetLogSettings()
{
return new LogSettings
{
LogDevice = LogDevice,
PageSizeBits = PageSizeBits,
SegmentSizeBits = SegmentSizeBits,
MemorySizeBits = MemorySizeBits,
CopyReadsToTail = false,
MutableFraction = 0,
ObjectLogDevice = null,
ReadCacheSettings = null
};
}
}

/// <summary>
/// FASTER log
/// </summary>
public class FasterLog
{
private readonly BlittableAllocator<Empty, byte> allocator;
public readonly LightEpoch epoch;
private readonly LightEpoch epoch;

/// <summary>
/// Beginning address of log
Expand Down
67 changes: 31 additions & 36 deletions cs/src/core/Index/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ public class FasterLogScanIterator : IDisposable
{
private readonly int frameSize;
private readonly BlittableAllocator<Empty, byte> allocator;
private readonly long beginAddress, endAddress;
private readonly long endAddress;
private readonly BlittableFrame frame;
private readonly CountdownEvent[] loaded;
private readonly long[] loadedPage;
private readonly LightEpoch epoch;

private bool first = true;
private long currentAddress, nextAddress;
private long currentPhysicalAddress;
private LightEpoch epoch;


/// <summary>
/// Current address
Expand All @@ -35,6 +35,7 @@ public class FasterLogScanIterator : IDisposable
/// <param name="beginAddress"></param>
/// <param name="endAddress"></param>
/// <param name="scanBufferingMode"></param>
/// <param name="epoch"></param>
public unsafe FasterLogScanIterator(BlittableAllocator<Empty, byte> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch)
{
this.allocator = hlog;
Expand All @@ -43,7 +44,6 @@ public unsafe FasterLogScanIterator(BlittableAllocator<Empty, byte> hlog, long b
if (beginAddress == 0)
beginAddress = hlog.GetFirstValidLogicalAddress(0);

this.beginAddress = beginAddress;
this.endAddress = endAddress;
currentAddress = -1;
nextAddress = beginAddress;
Expand All @@ -60,16 +60,10 @@ public unsafe FasterLogScanIterator(BlittableAllocator<Empty, byte> hlog, long b

frame = new BlittableFrame(frameSize, hlog.PageSize, hlog.GetDeviceSectorSize());
loaded = new CountdownEvent[frameSize];
loadedPage = new long[frameSize];
for (int i = 0; i < frameSize; i++)
loadedPage[i] = -1;

// Only load addresses flushed to disk
if (nextAddress < hlog.HeadAddress)
{
var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize;
hlog.AsyncReadPagesFromDeviceToFrame
(nextAddress >> hlog.LogPageSizeBits,
1, endAddress, AsyncReadPagesCallback, Empty.Default,
frame, out loaded[frameNumber]);
}
}

/// <summary>
Expand All @@ -83,16 +77,18 @@ public unsafe bool GetNext(out Span<byte> entry)
while (true)
{
// Check for boundary conditions
if (currentAddress < allocator.BeginAddress)
{
Debug.WriteLine("Iterator address is less than log BeginAddress " + allocator.BeginAddress + ", adjusting iterator address");
currentAddress = allocator.BeginAddress;
}

if ((currentAddress >= endAddress) || (currentAddress >= allocator.ReadOnlyAddress))
{
entry = default(Span<byte>);
return false;
}

if (currentAddress < allocator.BeginAddress)
{
throw new Exception("Iterator address is less than log BeginAddress " + allocator.BeginAddress);
}

if (frameSize == 0 && currentAddress < allocator.HeadAddress)
{
Expand Down Expand Up @@ -154,7 +150,6 @@ public unsafe bool GetNext(out Span<byte> entry)
entry = _entry;
epoch.Suspend();
}
currentPhysicalAddress = physicalAddress;
nextAddress = currentAddress + recordSize;
return true;
}
Expand All @@ -170,26 +165,26 @@ public void Dispose()

private unsafe void BufferAndLoad(long currentAddress, long currentPage, long currentFrame)
{
if (first || (currentAddress & allocator.PageSizeMask) == 0)
if (loadedPage[currentFrame] != currentPage)
{
// Prefetch pages based on buffering mode
if (frameSize == 1)
{
if (!first)
{
allocator.AsyncReadPagesFromDeviceToFrame(currentAddress >> allocator.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
}
}
else
if (loadedPage[currentFrame] != -1)
loaded[currentFrame].Wait(); // Ensure we have completed ongoing load
allocator.AsyncReadPagesFromDeviceToFrame(currentAddress >> allocator.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
loadedPage[currentFrame] = currentAddress >> allocator.LogPageSizeBits;
}

if (frameSize == 2)
{
currentPage++;
currentFrame = (currentFrame + 1) % frameSize;

if (loadedPage[currentFrame] != currentPage)
{
var endPage = endAddress >> allocator.LogPageSizeBits;
if ((endPage > currentPage) &&
((endPage > currentPage + 1) || ((endAddress & allocator.PageSizeMask) != 0)))
{
allocator.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> allocator.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]);
}
if (loadedPage[currentFrame] != -1)
loaded[currentFrame].Wait(); // Ensure we have completed ongoing load
allocator.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> allocator.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
loadedPage[currentFrame] = 1 + (currentAddress >> allocator.LogPageSizeBits);
}
first = false;
}
loaded[currentFrame].Wait();
}
Expand Down
48 changes: 48 additions & 0 deletions cs/src/core/Index/FasterLog/FasterLogSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

#pragma warning disable 0162

namespace FASTER.core.log
{
/// <summary>
/// FASTER Log Settings
/// </summary>
public class FasterLogSettings
{
/// <summary>
/// Device used for log
/// </summary>
public IDevice LogDevice = new NullDevice();

/// <summary>
/// Size of a segment (group of pages), in bits
/// </summary>
public int PageSizeBits = 22;

/// <summary>
/// Size of a segment (group of pages), in bits
/// </summary>
public int SegmentSizeBits = 30;

/// <summary>
/// Total size of in-memory part of log, in bits
/// </summary>
public int MemorySizeBits = 26;

internal LogSettings GetLogSettings()
{
return new LogSettings
{
LogDevice = LogDevice,
PageSizeBits = PageSizeBits,
SegmentSizeBits = SegmentSizeBits,
MemorySizeBits = MemorySizeBits,
CopyReadsToTail = false,
MutableFraction = 0,
ObjectLogDevice = null,
ReadCacheSettings = null
};
}
}
}

0 comments on commit 853b3ea

Please sign in to comment.