Skip to content

Commit

Permalink
Fixing deletes to handle CPR operation status.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Apr 21, 2020
1 parent 0de60be commit d8aaee5
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 17 deletions.
6 changes: 5 additions & 1 deletion cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,15 @@ internal Status ContextDelete(ref Key key, Context context, long serialNo, Faste
{
var pcontext = default(PendingContext);
var internalStatus = InternalDelete(ref key, ref context, ref pcontext, sessionCtx, serialNo);
var status = default(Status);
Status status;
if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND)
{
status = (Status) internalStatus;
}
else
{
status = HandleOperationStatus(sessionCtx, sessionCtx, pcontext, internalStatus);
}

sessionCtx.serialNum = serialNo;
return status;
Expand Down
20 changes: 4 additions & 16 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,6 @@ internal OperationStatus InternalDelete(
var logicalAddress = Constants.kInvalidAddress;
var physicalAddress = default(long);
var latchOperation = default(LatchOperation);
var version = default(int);
var latestRecordVersion = -1;

var hash = comparer.GetHashCode64(ref key);
Expand Down Expand Up @@ -902,26 +901,18 @@ internal OperationStatus InternalDelete(
}
#endregion

// NO optimization for most common case
//if (sessionCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress)
//{
// hlog.GetInfo(physicalAddress).Tombstone = true;
// return OperationStatus.SUCCESS;
//}

#region Entry latch operation
if (sessionCtx.phase != Phase.REST)
{
switch (sessionCtx.phase)
{
case Phase.PREPARE:
{
version = sessionCtx.version;
if (HashBucket.TryAcquireSharedLatch(bucket))
{
// Set to release shared latch (default)
latchOperation = LatchOperation.Shared;
if (latestRecordVersion != -1 && latestRecordVersion > version)
if (latestRecordVersion != -1 && latestRecordVersion > sessionCtx.version)
{
status = OperationStatus.CPR_SHIFT_DETECTED;
goto CreatePendingContext; // Pivot Thread
Expand All @@ -936,8 +927,7 @@ internal OperationStatus InternalDelete(
}
case Phase.IN_PROGRESS:
{
version = (sessionCtx.version - 1);
if (latestRecordVersion != -1 && latestRecordVersion <= version)
if (latestRecordVersion != -1 && latestRecordVersion < sessionCtx.version)
{
if (HashBucket.TryAcquireExclusiveLatch(bucket))
{
Expand All @@ -955,8 +945,7 @@ internal OperationStatus InternalDelete(
}
case Phase.WAIT_PENDING:
{
version = (sessionCtx.version - 1);
if (latestRecordVersion != -1 && latestRecordVersion <= version)
if (latestRecordVersion != -1 && latestRecordVersion < sessionCtx.version)
{
if (HashBucket.NoSharedLatches(bucket))
{
Expand All @@ -972,8 +961,7 @@ internal OperationStatus InternalDelete(
}
case Phase.WAIT_FLUSH:
{
version = (sessionCtx.version - 1);
if (latestRecordVersion != -1 && latestRecordVersion <= version)
if (latestRecordVersion != -1 && latestRecordVersion < sessionCtx.version)
{
goto CreateNewRecord; // Create a (v+1) record
}
Expand Down

0 comments on commit d8aaee5

Please sign in to comment.