Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/server/Resp/Vector/VectorManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public sealed partial class VectorManager : IDisposable

internal const int IndexSizeBytes = Index.Size;
internal const long VADDAppendLogArg = long.MinValue;
internal const long DeleteAfterDropArg = VADDAppendLogArg + 1;
internal const long RecreateIndexArg = DeleteAfterDropArg + 1;
// DeleteAfterDropArg used to be here
internal const long RecreateIndexArg = VADDAppendLogArg + 2;
internal const long VREMAppendLogArg = RecreateIndexArg + 1;
internal const long MigrateElementKeyLogArg = VREMAppendLogArg + 1;
internal const long MigrateIndexKeyLogArg = MigrateElementKeyLogArg + 1;
Expand Down
24 changes: 10 additions & 14 deletions libs/server/Storage/Functions/MainStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -812,20 +812,15 @@ private readonly IPUResult InPlaceUpdaterWorker(ref LogRecord logRecord, ref Str
case RespCommand.VADD:
// Adding to an existing VectorSet is modeled as a read operations
//
// However, we do synthesize some (pointless) writes to implement replication
// and a "make me delete=able"-update during drop.
// However, we do synthesize some (pointless) writes to implement replication (which we ignore here).
//
// Another "not quite write" is the recreate an index write operation
// that occurs if we're adding to an index that was restored from disk
// or a primary node.

// Handle "make me delete-able"
if (input.arg1 == VectorManager.DeleteAfterDropArg)
{
logRecord.ValueSpan.Clear();
}
else if (input.arg1 == VectorManager.RecreateIndexArg)
if (input.arg1 == VectorManager.RecreateIndexArg)
{
// Recreate index
var newIndexPtr = MemoryMarshal.Read<nint>(input.parseState.GetArgSliceByRef(11).Span);

functionsState.vectorManager.RecreateIndex(newIndexPtr, logRecord.ValueSpan);
Expand Down Expand Up @@ -1327,19 +1322,20 @@ public readonly bool CopyUpdater<TSourceLogRecord>(in TSourceLogRecord srcLogRec
break;

case RespCommand.VADD:
// Handle "make me delete-able"
if (input.arg1 == VectorManager.DeleteAfterDropArg)
{
dstLogRecord.ValueSpan.Clear();
}
else if (input.arg1 == VectorManager.RecreateIndexArg)
if (input.arg1 == VectorManager.RecreateIndexArg)
{
// Recreate index
var newIndexPtr = MemoryMarshal.Read<nint>(input.parseState.GetArgSliceByRef(11).Span);

oldValue.CopyTo(dstLogRecord.ValueSpan);

functionsState.vectorManager.RecreateIndex(newIndexPtr, dstLogRecord.ValueSpan);
}
else if (input.arg1 is VectorManager.VADDAppendLogArg or VectorManager.VREMAppendLogArg)
{
// VADD has triggered a CU of the index key - we want to do nothing but we have to copy to prevent corruption
oldValue.CopyTo(dstLogRecord.ValueSpan);
}

break;

Expand Down
6 changes: 6 additions & 0 deletions libs/server/Storage/Functions/MainStore/VarLenInputMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,12 @@ public RecordFieldInfo GetRMWModifiedFieldInfo<TSourceLogRecord>(in TSourceLogRe

case RespCommand.VADD:
case RespCommand.VREM:
if (input.arg1 is VectorManager.VADDAppendLogArg or VectorManager.VREMAppendLogArg)
{
// During replication we might trigger a CU, in which case... make sure there's space for the index we'll copy over
fieldInfo.ValueSize = VectorManager.IndexSize;
}
Comment thread
kevin-montrose marked this conversation as resolved.

return fieldInfo;

default:
Expand Down
Loading