Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API Coverage - Implement SINTER and SINTERSTORE (#180) #334

Merged
merged 11 commits into from
May 20, 2024
8 changes: 8 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ public GarnetStatus SetDiff(ArgSlice[] keys, out HashSet<byte[]> members)
public GarnetStatus SetDiffStore(byte[] key, ArgSlice[] keys, out int count)
=> storageSession.SetDiffStore(key, keys, out count);

/// <inheritdoc />
public GarnetStatus SetIntersect(ArgSlice[] keys, out HashSet<byte[]> output)
=> storageSession.SetIntersect(keys, out output);

/// <inheritdoc />
public GarnetStatus SetIntersectStore(byte[] key, ArgSlice[] keys, out int count)
=> storageSession.SetIntersectStore(key, keys, out count);

#endregion

#region Hash Methods
Expand Down
9 changes: 9 additions & 0 deletions libs/server/API/GarnetWatchApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,15 @@ public GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output)
return garnetApi.SetUnion(keys, out output);
}

/// <inheritdoc />
public GarnetStatus SetIntersect(ArgSlice[] keys, out HashSet<byte[]> output)
{
foreach (var key in keys)
{
garnetApi.WATCH(key, StoreType.Object);
}
return garnetApi.SetIntersect(keys, out output);
}

/// <inheritdoc />
public GarnetStatus SetDiff(ArgSlice[] keys, out HashSet<byte[]> output)
Expand Down
19 changes: 19 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,16 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SetUnionStore(byte[] key, ArgSlice[] keys, out int count);

/// <summary>
/// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
/// </summary>
/// <param name="key"></param>
/// <param name="keys"></param>
/// <param name="count"></param>
/// <returns></returns>
GarnetStatus SetIntersectStore(byte[] key, ArgSlice[] keys, out int count);

/// <summary>
/// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
Expand Down Expand Up @@ -1248,6 +1258,15 @@ public interface IGarnetReadApi
/// <returns></returns>
GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output);

/// <summary>
/// Returns the members of the set resulting from the intersection of all the given sets.
/// Keys that do not exist are considered to be empty sets.
/// </summary>
/// <param name="keys"></param>
/// <param name="output"></param>
/// <returns></returns>
GarnetStatus SetIntersect(ArgSlice[] keys, out HashSet<byte[]> output);

/// <summary>
/// Returns the members of the set resulting from the difference between the first set and all the successive sets.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions libs/server/Objects/Set/SetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public enum SetOperation : byte
SUNIONSTORE,
SDIFF,
SDIFFSTORE,
SINTER,
SINTERSTORE
}


Expand Down
123 changes: 123 additions & 0 deletions libs/server/Resp/Objects/SetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,129 @@ private unsafe bool SetAdd<TGarnetApi>(int count, byte* ptr, ref TGarnetApi stor
return true;
}

/// <summary>
/// Returns the members of the set resulting from the intersection of all the given sets.
/// Keys that do not exist are considered to be empty sets.
/// </summary>
/// <param name="count"></param>
/// <param name="ptr"></param>
/// <param name="storageApi"></param>
/// <typeparam name="TGarnetApi"></typeparam>
/// <returns></returns>
private bool SetIntersect<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count < 1)
{
return AbortWithWrongNumberOfArguments("SINTER", count);
}

// Read all the keys
ArgSlice[] keys = new ArgSlice[count];

for (int i = 0; i < keys.Length; i++)
{
keys[i] = default;
if (!RespReadUtils.ReadPtrWithLengthHeader(ref keys[i].ptr, ref keys[i].length, ref ptr, recvBufferPtr + bytesRead))
return false;
}

if (NetworkKeyArraySlotVerify(ref keys, true))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count)) return false;
return true;
}

var status = storageApi.SetIntersect(keys, out var result);

if (status == GarnetStatus.OK)
{
// write the size of result
int resultCount = 0;
if (result != null)
{
resultCount = result.Count;
while (!RespWriteUtils.WriteArrayLength(resultCount, ref dcurr, dend))
SendAndReset();

foreach (var item in result)
{
while (!RespWriteUtils.WriteBulkString(item, ref dcurr, dend))
SendAndReset();
}
}
else
{
while (!RespWriteUtils.WriteArrayLength(resultCount, ref dcurr, dend))
SendAndReset();
}
}

// update read pointers
readHead = (int)(ptr - recvBufferPtr);
return true;
}

/// <summary>
/// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="count"></param>
/// <param name="ptr"></param>
/// <param name="storageApi"></param>
/// <returns></returns>
private bool SetIntersectStore<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count < 2)
{
return AbortWithWrongNumberOfArguments("SINTERSTORE", count);
}

// Get the key
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var key, ref ptr, recvBufferPtr + bytesRead))
return false;

if (NetworkSingleKeySlotVerify(key, false))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count))
return false;
return true;
}

var keys = new ArgSlice[count - 1];
for (var i = 0; i < count - 1; i++)
{
keys[i] = default;
if (!RespReadUtils.ReadPtrWithLengthHeader(ref keys[i].ptr, ref keys[i].length, ref ptr, recvBufferPtr + bytesRead))
return false;
}

if (NetworkKeyArraySlotVerify(ref keys, true))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count)) return false;
return true;
}

var status = storageApi.SetIntersectStore(key, keys, out var output);

if (status == GarnetStatus.OK)
{
while (!RespWriteUtils.WriteInteger(output, ref dcurr, dend))
SendAndReset();
}

// Move input head
readHead = (int)(ptr - recvBufferPtr);

return true;
}


/// <summary>
/// Returns the members of the set resulting from the union of all the given sets.
/// Keys that do not exist are considered to be empty sets.
Expand Down
8 changes: 8 additions & 0 deletions libs/server/Resp/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,10 @@ static RespCommand MatchedNone(RespServerSession session, int oldReadHead)
{
return (RespCommand.Set, (byte)SetOperation.SUNION);
}
else if (*(ulong*)(ptr + 4) == MemoryMarshal.Read<ulong>("SINTER\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SINTER);
}
break;

case 'U':
Expand Down Expand Up @@ -915,6 +919,10 @@ static RespCommand MatchedNone(RespServerSession session, int oldReadHead)
{
return (RespCommand.Set, (byte)SetOperation.SUNIONSTORE);
}
else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read<ulong>("1\r\nSINTE"u8) && *(ulong*)(ptr + 10) == MemoryMarshal.Read<ulong>("RSTORE\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SINTERSTORE);
}
break;

case 12:
Expand Down
76 changes: 76 additions & 0 deletions libs/server/Resp/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -3887,6 +3887,38 @@
],
"SubCommands": null
},
{
"Command": "Set",
"ArrayCommand": 13,
"Name": "SINTER",
"IsInternal": false,
"Arity": -2,
"Flags": "ReadOnly",
"FirstKey": 1,
"LastKey": -1,
"Step": 1,
"AclCategories": "Read, Set, Slow",
"Tips": [
"nondeterministic_output_order"
],
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": -1,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "RO, Access"
}
],
"SubCommands": null
},
{
"Command": "Set",
"ArrayCommand": 10,
Expand Down Expand Up @@ -3931,6 +3963,50 @@
],
"SubCommands": null
},
{
"Command": "Set",
"ArrayCommand": 14,
"Name": "SINTERSTORE",
"IsInternal": false,
"Arity": -3,
"Flags": "DenyOom, Write",
"FirstKey": 1,
"LastKey": -1,
"Step": 1,
"AclCategories": "Set, Slow, Write",
"Tips": null,
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "OW, Update"
},
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 2
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": -1,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "RO, Access"
}
],
"SubCommands": null
},
{
"Command": "TIME",
"ArrayCommand": null,
Expand Down
2 changes: 2 additions & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, byte subcmd, int
(RespCommand.Set, (byte)SetOperation.SUNIONSTORE) => SetUnionStore(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SDIFF) => SetDiff(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SDIFFSTORE) => SetDiffStore(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SINTER) => SetIntersect(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SINTERSTORE) => SetIntersectStore(count, ptr, ref storageApi),
_ => ProcessOtherCommands(cmd, subcmd, count, ref storageApi),
};
return success;
Expand Down