Skip to content

Commit

Permalink
Merging from master
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Dec 4, 2019
2 parents 6b03887 + 47d50d1 commit 876c0d3
Show file tree
Hide file tree
Showing 31 changed files with 390 additions and 68 deletions.
15 changes: 9 additions & 6 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,9 @@ public unsafe abstract partial class AllocatorBase<Key, Value> : IDisposable
/// <param name="result"></param>
/// <param name="device"></param>
/// <param name="objectLogDevice"></param>
protected abstract void WriteAsyncToDevice<TContext>(long startPage, long flushPage, int pageSize, IOCompletionCallback callback, PageAsyncFlushResult<TContext> result, IDevice device, IDevice objectLogDevice);
/// <param name="localSegmentOffsets"></param>
protected abstract void WriteAsyncToDevice<TContext>(long startPage, long flushPage, int pageSize, IOCompletionCallback callback, PageAsyncFlushResult<TContext> result, IDevice device, IDevice objectLogDevice, long[] localSegmentOffsets);

/// <summary>
/// Read objects to memory (async)
/// </summary>
Expand Down Expand Up @@ -517,11 +519,11 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
SegmentBufferSize = 1 + (LogTotalSizeBytes / SegmentSize < 1 ? 1 : (int)(LogTotalSizeBytes / SegmentSize));

if (SegmentSize < PageSize)
throw new Exception("Segment must be at least of page size");
throw new FasterException("Segment must be at least of page size");

if (BufferSize < 1)
{
throw new Exception("Log buffer must be of size at least 1 page");
throw new FasterException("Log buffer must be of size at least 1 page");
}

PageStatusIndicator = new FullPageStatus[BufferSize];
Expand Down Expand Up @@ -706,7 +708,7 @@ public int GetDeviceSectorSize()
public long TryAllocate(int numSlots = 1)
{
if (numSlots > PageSize)
throw new Exception("Entry does not fit on page");
throw new FasterException("Entry does not fit on page");

PageOffset localTailPageOffset = default(PageOffset);

Expand Down Expand Up @@ -898,7 +900,7 @@ public void OnPagesClosed(long newSafeHeadAddress)
ShiftClosedUntilAddress();
if (ClosedUntilAddress > FlushedUntilAddress)
{
throw new Exception($"Closed address {ClosedUntilAddress} exceeds flushed address {FlushedUntilAddress}");
throw new FasterException($"Closed address {ClosedUntilAddress} exceeds flushed address {FlushedUntilAddress}");
}
}
}
Expand Down Expand Up @@ -1392,6 +1394,7 @@ public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogica
completedSemaphore = completedSemaphore,
count = totalNumPages
};
var localSegmentOffsets = new long[SegmentBufferSize];

for (long flushPage = startPage; flushPage < endPage; flushPage++)
{
Expand All @@ -1402,7 +1405,7 @@ public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogica
pageSize = (int)(endLogicalAddress - (flushPage << LogPageSizeBits));

// Intended destination is flushPage
WriteAsyncToDevice(startPage, flushPage, pageSize, AsyncFlushPageToDeviceCallback, asyncResult, device, objectLogDevice);
WriteAsyncToDevice(startPage, flushPage, pageSize, AsyncFlushPageToDeviceCallback, asyncResult, device, objectLogDevice, localSegmentOffsets);
}
}

Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/AtomicOwner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public bool Release()
return false;

if (newer.owner == 0)
throw new Exception("Invalid release by non-owner thread");
throw new FasterException("Invalid release by non-owner thread");
newer.owner = 0;

if (Interlocked.CompareExchange(ref this.atomic, newer.atomic, older.atomic) == older.atomic)
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected override void WriteAsync<TContext>(long flushPage, IOCompletionCallbac

protected override void WriteAsyncToDevice<TContext>
(long startPage, long flushPage, int pageSize, IOCompletionCallback callback,
PageAsyncFlushResult<TContext> asyncResult, IDevice device, IDevice objectLogDevice)
PageAsyncFlushResult<TContext> asyncResult, IDevice device, IDevice objectLogDevice, long[] localSegmentOffsets)
{
var alignedPageSize = (pageSize + (sectorSize - 1)) & ~(sectorSize - 1);

Expand Down Expand Up @@ -305,7 +305,7 @@ public override long[] GetSegmentOffsets()

internal override void PopulatePage(byte* src, int required_bytes, long destinationPage)
{
throw new Exception("BlittableAllocator memory pages are sector aligned - use direct copy");
throw new FasterException("BlittableAllocator memory pages are sector aligned - use direct copy");
// Buffer.MemoryCopy(src, (void*)pointers[destinationPage % BufferSize], required_bytes, required_bytes);
}

Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Allocator/BlittableScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ public bool GetNext(out RecordInfo recordInfo)

if (currentAddress < hlog.BeginAddress)
{
throw new Exception("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
throw new FasterException("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
}

if (frameSize == 0 && currentAddress < hlog.HeadAddress)
{
throw new Exception("Iterator address is less than log HeadAddress in memory-scan mode");
throw new FasterException("Iterator address is less than log HeadAddress in memory-scan mode");
}

var currentPage = currentAddress >> hlog.LogPageSizeBits;
Expand Down
14 changes: 7 additions & 7 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> ser

if ((!keyBlittable) && (settings.LogDevice as NullDevice == null) && ((SerializerSettings == null) || (SerializerSettings.keySerializer == null)))
{
throw new Exception("Key is not blittable, but no serializer specified via SerializerSettings");
throw new FasterException("Key is not blittable, but no serializer specified via SerializerSettings");
}

if ((!valueBlittable) && (settings.LogDevice as NullDevice == null) && ((SerializerSettings == null) || (SerializerSettings.valueSerializer == null)))
{
throw new Exception("Value is not blittable, but no serializer specified via SerializerSettings");
throw new FasterException("Value is not blittable, but no serializer specified via SerializerSettings");
}

values = new Record<Key, Value>[BufferSize][];
Expand All @@ -64,7 +64,7 @@ public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> ser
if ((settings.LogDevice as NullDevice == null) && (KeyHasObjects() || ValueHasObjects()))
{
if (objectLogDevice == null)
throw new Exception("Objects in key/value, but object log not provided during creation of FASTER instance");
throw new FasterException("Objects in key/value, but object log not provided during creation of FASTER instance");
}
}

Expand Down Expand Up @@ -240,13 +240,13 @@ protected override void WriteAsync<TContext>(long flushPage, IOCompletionCallbac

protected override void WriteAsyncToDevice<TContext>
(long startPage, long flushPage, int pageSize, IOCompletionCallback callback,
PageAsyncFlushResult<TContext> asyncResult, IDevice device, IDevice objectLogDevice)
PageAsyncFlushResult<TContext> asyncResult, IDevice device, IDevice objectLogDevice, long[] localSegmentOffsets)
{
// We are writing to separate device, so use fresh segment offsets
WriteAsync(flushPage,
(ulong)(AlignedPageSizeBytes * (flushPage - startPage)),
(uint)pageSize, callback, asyncResult,
device, objectLogDevice, flushPage, new long[SegmentBufferSize]);
device, objectLogDevice, flushPage, localSegmentOffsets);
}


Expand Down Expand Up @@ -559,7 +559,7 @@ private void AsyncReadPageWithObjectsCallback<TContext>(uint errorCode, uint num
Debug.Assert(startptr % sectorSize == 0);

if (size > int.MaxValue)
throw new Exception("Unable to read object page, total size greater than 2GB: " + size);
throw new FasterException("Unable to read object page, total size greater than 2GB: " + size);

var alignedLength = (size + (sectorSize - 1)) & ~(sectorSize - 1);
var objBuffer = bufferPool.Get((int)alignedLength);
Expand Down Expand Up @@ -872,7 +872,7 @@ protected override bool RetrievedFullRecord(byte* record, ref AsyncIOContext<Key

// We are limited to a 2GB size per key-value
if (endAddress-startAddress > int.MaxValue)
throw new Exception("Size of key-value exceeds max of 2GB: " + (endAddress - startAddress));
throw new FasterException("Size of key-value exceeds max of 2GB: " + (endAddress - startAddress));

AsyncGetFromDisk(startAddress, (int)(endAddress - startAddress), ctx, ctx.record);
return false;
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Allocator/GenericScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ public bool GetNext(out RecordInfo recordInfo)

if (currentAddress < hlog.BeginAddress)
{
throw new Exception("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
throw new FasterException("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
}

if (frameSize == 0 && currentAddress < hlog.HeadAddress)
{
throw new Exception("Iterator address is less than log HeadAddress in memory-scan mode");
throw new FasterException("Iterator address is less than log HeadAddress in memory-scan mode");
}

var currentPage = currentAddress >> hlog.LogPageSizeBits;
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Allocator/MallocFixedPageSize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public long GetPhysicalAddress(long address)
public ref T Get(long index)
{
if (this.ReturnPhysicalAddress)
throw new Exception("Physical pointer returned by allocator: de-reference pointer to get records instead of calling Get");
throw new FasterException("Physical pointer returned by allocator: de-reference pointer to get records instead of calling Get");

return ref values
[index >> PageSizeBits]
Expand All @@ -155,7 +155,7 @@ public ref T Get(long index)
public void Set(long index, ref T value)
{
if (this.ReturnPhysicalAddress)
throw new Exception("Physical pointer returned by allocator: de-reference pointer to set records instead of calling Set (otherwise, set ForceUnpinnedAllocation to true)");
throw new FasterException("Physical pointer returned by allocator: de-reference pointer to set records instead of calling Set (otherwise, set ForceUnpinnedAllocation to true)");

values
[index >> PageSizeBits]
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/PendingFlushList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void Add(PageAsyncFlushResult<Empty> t)
}
}
} while (retries++ < maxRetries);
throw new Exception("Unable to add item to list");
throw new FasterException("Unable to add item to list");
}

public bool RemoveAdjacent(long address, out PageAsyncFlushResult<Empty> request)
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Allocator/VarLenBlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ protected override void WriteAsync<TContext>(long flushPage, IOCompletionCallbac

protected override void WriteAsyncToDevice<TContext>
(long startPage, long flushPage, int pageSize, IOCompletionCallback callback,
PageAsyncFlushResult<TContext> asyncResult, IDevice device, IDevice objectLogDevice)
PageAsyncFlushResult<TContext> asyncResult, IDevice device, IDevice objectLogDevice, long[] localSegmentOffsets)
{
var alignedPageSize = (pageSize + (sectorSize - 1)) & ~(sectorSize - 1);

Expand Down Expand Up @@ -411,7 +411,7 @@ public override long[] GetSegmentOffsets()

internal override void PopulatePage(byte* src, int required_bytes, long destinationPage)
{
throw new Exception("BlittableAllocator memory pages are sector aligned - use direct copy");
throw new FasterException("BlittableAllocator memory pages are sector aligned - use direct copy");
// Buffer.MemoryCopy(src, (void*)pointers[destinationPage % BufferSize], required_bytes, required_bytes);
}

Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Allocator/VarLenBlittableScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ public bool GetNext(out RecordInfo recordInfo)

if (currentAddress < hlog.BeginAddress)
{
throw new Exception("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
throw new FasterException("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
}

if (frameSize == 0 && currentAddress < hlog.HeadAddress)
{
throw new Exception("Iterator address is less than log HeadAddress in memory-scan mode");
throw new FasterException("Iterator address is less than log HeadAddress in memory-scan mode");
}

var currentPage = currentAddress >> hlog.LogPageSizeBits;
Expand Down
15 changes: 12 additions & 3 deletions cs/src/core/Device/FixedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,26 @@ public FixedPool(int size, Func<T> creator)
{
while (true)
{
for (int i=0; i<size; i++)
for (int i = 0; i < size; i++)
{
if (disposed)
throw new Exception("Disposed");
throw new FasterException("Disposed");

var val = owners[i];
if (val == 0)
{
if (Interlocked.CompareExchange(ref owners[i], 2, val) == val)
{
items[i] = creator();
try
{
items[i] = creator();
}
catch
{
Interlocked.Exchange(ref owners[i], val);
throw;
}

return (items[i], i);
}
}
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Device/LocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ private SafeFileHandle CreateHandle(int segmentId)
}
catch (Exception e)
{
throw new Exception("Error binding log handle for " + GetSegmentName(segmentId) + ": " + e.ToString());
throw new FasterException("Error binding log handle for " + GetSegmentName(segmentId) + ": " + e.ToString());
}
return logHandle;
}
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Device/StorageDeviceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public virtual void Initialize(long segmentSize, LightEpoch epoch = null)
if (!Utility.IsPowerOfTwo(segmentSize))
{
if (segmentSize != -1)
throw new Exception("Invalid segment size: " + segmentSize);
throw new FasterException("Invalid segment size: " + segmentSize);
segmentSizeBits = 64;
segmentSizeMask = ~0UL;
}
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Epochs/FastThreadLocal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public FastThreadLocal()
return;
}
}
throw new Exception("Unsupported number of simultaneous instances");
throw new FasterException("Unsupported number of simultaneous instances");
}

public void InitializeThread()
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private static int ReserveEntry(int startIndex, int threadId)

if (current_iteration > (kTableSize * 10))
{
throw new Exception("Unable to reserve an epoch entry, try increasing the epoch table size (kTableSize)");
throw new FasterException("Unable to reserve an epoch entry, try increasing the epoch table size (kTableSize)");
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/Common/AddressInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public long Size
multiplier = 1;
if (val >= (1 << kSizeBits))
{
throw new Exception("Unsupported object size: " + value);
throw new FasterException("Unsupported object size: " + value);
}
}
var _word = (long)word;
Expand All @@ -87,7 +87,7 @@ public long Address
word = (IntPtr)_word;
if (value != Address)
{
throw new Exception("Overflow in AddressInfo" + ((kAddressBits < 64) ? " - consider running the program in x64 mode for larger address space support" : ""));
throw new FasterException("Overflow in AddressInfo" + ((kAddressBits < 64) ? " - consider running the program in x64 mode for larger address space support" : ""));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ internal void Recover(Guid token, ICheckpointManager checkpointManager)
{
var metadata = checkpointManager.GetLogCommitMetadata(token);
if (metadata == null)
throw new Exception("Invalid log commit metadata for ID " + token.ToString());
throw new FasterException("Invalid log commit metadata for ID " + token.ToString());

using (var s = new StreamReader(new MemoryStream(metadata)))
Initialize(s);
Expand Down Expand Up @@ -446,7 +446,7 @@ public void Recover(Guid guid, ICheckpointManager checkpointManager)
{
var metadata = checkpointManager.GetIndexCommitMetadata(guid);
if (metadata == null)
throw new Exception("Invalid index commit metadata for ID " + guid.ToString());
throw new FasterException("Invalid index commit metadata for ID " + guid.ToString());
using (var s = new StreamReader(new MemoryStream(metadata)))
Initialize(s);
}
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo
checkpointSettings = new CheckpointSettings();

if (checkpointSettings.CheckpointDir != null && checkpointSettings.CheckpointManager != null)
throw new Exception("Specify either CheckpointManager or CheckpointDir for CheckpointSettings, not both");
throw new FasterException("Specify either CheckpointManager or CheckpointDir for CheckpointSettings, not both");

checkpointManager = checkpointSettings.CheckpointManager ?? new LocalCheckpointManager(checkpointSettings.CheckpointDir ?? "");

Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/FASTER/FASTERBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ protected virtual string _DumpDistribution(int version)
var x = default(HashBucketEntry);
x.word = b.bucket_entries[bucket_entry];
if (tags.Contains(x.Tag) && !x.Tentative)
throw new Exception("Duplicate tag found in index");
throw new FasterException("Duplicate tag found in index");
tags.Add(x.Tag);
++cnt;
++total_record_count;
Expand Down
Loading

0 comments on commit 876c0d3

Please sign in to comment.