Skip to content

Commit

Permalink
Refactor string encoding to be consistent and more memory efficient (#…
Browse files Browse the repository at this point in the history
…223)

* Remove intermediate array allocations from ASCII decoding

* Update RespWriteUtils to take spans instead of arrays

* Let's not encourage allocations

* Merge WriteResponse with WriteDirect

* They're the samething aka "TryCopyTo" if it was BCL naming

* Remove intermediate array allocations from ASCII string encoding

* Add WriteAsciiDirect to RespWriteUtils

* Update ASCII encoding logic to increment the curr pointer with the "actual" encoded byte count, just to be sure.

* Rename WriteBulkString to WriteAsciiBulkString

* Makes it clear what was the assumption for the input string

* Make all simple string encoding variants behave consistently

* Either they should all assume the input to be ASCII, or none of them, or there should be explicit overloads for "unsafe" assumptions.

* Remove intermediate array allocations from ASCII string encoding

* Now using the WriterDirectAscii

* Remove more intermediate array allocations from ASCII string encoding

* Removed unnecessary explicit ROSpan<byte> casts from arrays

* Remove more intermediate array allocations from ASCII string encoding

* Skip encoding of constant strings by making them u8-literals

* Remove more intermediate array allocations from ASCII string encoding

* Remove couple allocations from WriteSimpleString calls

* Remove more intermediate array allocations from ASCII string encoding

* Skip encoding of constant strings by making them u8-literals

* Fix merge

* Add WriteUtf8BulkString overload to RespWriteUtils

* Add simple regression test for unicode SET value

* and watch as tests hang

* Fix GarnetClient encoding of unicode bulk strings as keys or values

* Add missing newline to generic error response in ReplicaOfCommand

* Do not repeat the error message in PrimarySendCheckpoint
  • Loading branch information
PaulusParssinen committed Apr 5, 2024
1 parent fed14fe commit f2ee39a
Show file tree
Hide file tree
Showing 45 changed files with 495 additions and 472 deletions.
38 changes: 19 additions & 19 deletions benchmark/Resp.benchmark/RespOnlineBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -434,39 +434,39 @@ public unsafe void OpRunnerLightClient(int thread_id)

byte[] pingBufferAllocation = GC.AllocateArray<byte>(14, true);
byte* pingBuffer = (byte*)Unsafe.AsPointer(ref pingBufferAllocation[0]);
new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes("*1\r\n$4\r\nPING\r\n")).CopyTo(new Span<byte>(pingBuffer, 14));
"*1\r\n$4\r\nPING\r\n"u8.CopyTo(new Span<byte>(pingBuffer, 14));

byte[] getBufferA = GC.AllocateArray<byte>(13 + RespWriteUtils.GetBulkStringLength(keyLen), true);
byte* getBuffer = (byte*)Unsafe.AsPointer(ref getBufferA[0]);
new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes("*2\r\n$3\r\nGET\r\n")).CopyTo(new Span<byte>(getBuffer, 13));
"*2\r\n$3\r\nGET\r\n"u8.CopyTo(new Span<byte>(getBuffer, 13));

byte[] setBufferA = GC.AllocateArray<byte>(130 + RespWriteUtils.GetBulkStringLength(keyLen) + RespWriteUtils.GetBulkStringLength(valueLen), true);
byte* setBuffer = (byte*)Unsafe.AsPointer(ref setBufferA[0]);
new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes("*3\r\n$3\r\nSET\r\n")).CopyTo(new Span<byte>(setBuffer, 13));
"*3\r\n$3\r\nSET\r\n"u8.CopyTo(new Span<byte>(setBuffer, 13));

byte[] setexBufferA = GC.AllocateArray<byte>(15 + RespWriteUtils.GetBulkStringLength(keyLen) + RespWriteUtils.GetIntegerAsBulkStringLength(opts.Ttl) + RespWriteUtils.GetBulkStringLength(valueLen), true);
byte* setexBuffer = (byte*)Unsafe.AsPointer(ref setexBufferA[0]);
new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes("*4\r\n$5\r\nSETEX\r\n")).CopyTo(new Span<byte>(setexBuffer, 15));
"*4\r\n$5\r\nSETEX\r\n"u8.CopyTo(new Span<byte>(setexBuffer, 15));

byte[] delBufferA = GC.AllocateArray<byte>(13 + RespWriteUtils.GetBulkStringLength(keyLen), true);
byte* delBuffer = (byte*)Unsafe.AsPointer(ref delBufferA[0]);
new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes("*2\r\n$3\r\nDEL\r\n")).CopyTo(new Span<byte>(delBuffer, 13));
"*2\r\n$3\r\nDEL\r\n"u8.CopyTo(new Span<byte>(delBuffer, 13));

byte[] zaddBufferA = GC.AllocateArray<byte>(14 + RespWriteUtils.GetBulkStringLength(sskeyLen) + RespWriteUtils.GetIntegerAsBulkStringLength(1) + RespWriteUtils.GetBulkStringLength(keyLen), true);
byte* zaddBuffer = (byte*)Unsafe.AsPointer(ref zaddBufferA[0]);
new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes("*4\r\n$4\r\nZADD\r\n")).CopyTo(new Span<byte>(zaddBuffer, 14));
"*4\r\n$4\r\nZADD\r\n"u8.CopyTo(new Span<byte>(zaddBuffer, 14));

byte[] zremBufferA = GC.AllocateArray<byte>(14 + RespWriteUtils.GetBulkStringLength(sskeyLen) + RespWriteUtils.GetBulkStringLength(keyLen), true);
byte* zremBuffer = (byte*)Unsafe.AsPointer(ref zremBufferA[0]);
new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes("*3\r\n$4\r\nZREM\r\n")).CopyTo(new Span<byte>(zremBuffer, 14));
"*3\r\n$4\r\nZREM\r\n"u8.CopyTo(new Span<byte>(zremBuffer, 14));

byte[] zcardBufferAllocation = GC.AllocateArray<byte>(15 + RespWriteUtils.GetBulkStringLength(sskeyLen), true);
byte* zcardBuffer = (byte*)Unsafe.AsPointer(ref zcardBufferAllocation[0]);
new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes("*2\r\n$4\r\nZCARD\r\n")).CopyTo(new Span<byte>(zcardBuffer, 15));
"*2\r\n$4\r\nZCARD\r\n"u8.CopyTo(new Span<byte>(zcardBuffer, 15));

byte[] expireBufferA = GC.AllocateArray<byte>(16 + RespWriteUtils.GetBulkStringLength(sskeyLen) + RespWriteUtils.GetIntegerAsBulkStringLength(opts.Ttl), true);
byte* expireBuffer = (byte*)Unsafe.AsPointer(ref expireBufferA[0]);
new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes("*3\r\n$6\r\nEXPIRE\r\n")).CopyTo(new Span<byte>(expireBuffer, 16));
"*3\r\n$6\r\nEXPIRE\r\n"u8.CopyTo(new Span<byte>(expireBuffer, 16));

byte* getEnd = getBuffer + 13 + RespWriteUtils.GetBulkStringLength(keyLen);
byte* setEnd = setBuffer + 130 + RespWriteUtils.GetBulkStringLength(keyLen) + RespWriteUtils.GetBulkStringLength(valueLen);
Expand Down Expand Up @@ -496,58 +496,58 @@ public unsafe void OpRunnerLightClient(int thread_id)
break;
case OpType.GET:
byte* getCurr = getBuffer + 13;
RespWriteUtils.WriteBulkString(Encoding.ASCII.GetBytes(req.GenerateKey()), ref getCurr, getEnd);
RespWriteUtils.WriteAsciiBulkString(req.GenerateKey(), ref getCurr, getEnd);
client.Send(getBuffer, (int)(getCurr - getBuffer), 1);
client.CompletePendingRequests();
break;
case OpType.SET:
byte* setCurr = setBuffer + 13;
RespWriteUtils.WriteBulkString(Encoding.ASCII.GetBytes(req.GenerateKey()), ref setCurr, setEnd);
RespWriteUtils.WriteAsciiBulkString(req.GenerateKey(), ref setCurr, setEnd);
RespWriteUtils.WriteBulkString(req.GenerateValueBytes().Span, ref setCurr, setEnd);
client.Send(setBuffer, (int)(setCurr - setBuffer), 1);
client.CompletePendingRequests();
break;
case OpType.SETEX:
byte* setexCurr = setexBuffer + 15;
RespWriteUtils.WriteBulkString(Encoding.ASCII.GetBytes(req.GenerateKey()), ref setexCurr, setexEnd);
RespWriteUtils.WriteAsciiBulkString(req.GenerateKey(), ref setexCurr, setexEnd);
RespWriteUtils.WriteIntegerAsBulkString(opts.Ttl, ref setexCurr, setexEnd);
RespWriteUtils.WriteBulkString(req.GenerateValueBytes().Span, ref setexCurr, setexEnd);
client.Send(setexBuffer, (int)(setexCurr - setexBuffer), 1);
client.CompletePendingRequests();
break;
case OpType.DEL:
byte* delCurr = delBuffer + 13;
RespWriteUtils.WriteBulkString(Encoding.ASCII.GetBytes(req.GenerateKey()), ref delCurr, delEnd);
RespWriteUtils.WriteAsciiBulkString(req.GenerateKey(), ref delCurr, delEnd);
client.Send(delBuffer, (int)(delCurr - delBuffer), 1);
client.CompletePendingRequests();
break;
case OpType.ZADD:
byte* zaddCurr = zaddBuffer + 14;
RespWriteUtils.WriteBulkString(Encoding.ASCII.GetBytes(sskey), ref zaddCurr, zaddEnd);
RespWriteUtils.WriteAsciiBulkString(sskey, ref zaddCurr, zaddEnd);
RespWriteUtils.WriteIntegerAsBulkString(1, ref zaddCurr, zaddEnd);
RespWriteUtils.WriteBulkString(Encoding.ASCII.GetBytes(req.GenerateKey()), ref zaddCurr, zaddEnd);
RespWriteUtils.WriteAsciiBulkString(req.GenerateKey(), ref zaddCurr, zaddEnd);
client.Send(zaddBuffer, (int)(zaddCurr - zaddBuffer), 1);
if (opts.Ttl > 0)
{
// NOTE: Here we are not resetting opType. This only works for online bench
byte* expireCurr = expireBuffer + 16;
RespWriteUtils.WriteBulkString(Encoding.ASCII.GetBytes(sskey), ref expireCurr, expireEnd);
RespWriteUtils.WriteAsciiBulkString(sskey, ref expireCurr, expireEnd);
RespWriteUtils.WriteIntegerAsBulkString(opts.Ttl, ref expireCurr, expireEnd);
client.Send(expireBuffer, (int)(expireCurr - expireBuffer), 1);
}
client.CompletePendingRequests();
break;
case OpType.ZREM:
byte* zremCurr = zremBuffer + 14;
RespWriteUtils.WriteBulkString(Encoding.ASCII.GetBytes(sskey), ref zremCurr, zremEnd);
RespWriteUtils.WriteAsciiBulkString(sskey, ref zremCurr, zremEnd);
RespWriteUtils.WriteIntegerAsBulkString(1, ref zremCurr, zremEnd);
RespWriteUtils.WriteBulkString(Encoding.ASCII.GetBytes(req.GenerateKey()), ref zremCurr, zremEnd);
RespWriteUtils.WriteAsciiBulkString(req.GenerateKey(), ref zremCurr, zremEnd);
client.Send(zremBuffer, (int)(zremCurr - zremEnd), 1);
client.CompletePendingRequests();
break;
case OpType.ZCARD:
byte* zcardCurr = zcardBuffer + 15;
RespWriteUtils.WriteBulkString(Encoding.ASCII.GetBytes(sskey), ref zcardCurr, zcardEnd);
RespWriteUtils.WriteAsciiBulkString(sskey, ref zcardCurr, zcardEnd);
client.Send(zcardBuffer, (int)(zcardCurr - zcardEnd), 1);
client.CompletePendingRequests();
break;
Expand Down
8 changes: 4 additions & 4 deletions libs/client/ClientSession/GarnetClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void ExecuteClusterAppendLog(string nodeId, long previousAddress, long cu
}
offset = curr;

while (!RespWriteUtils.WriteBulkString(nodeId, ref curr, end))
while (!RespWriteUtils.WriteAsciiBulkString(nodeId, ref curr, end))
{
Flush();
curr = offset;
Expand Down Expand Up @@ -311,7 +311,7 @@ private void InternalExecute(params string[] command)

foreach (var cmd in command)
{
while (!RespWriteUtils.WriteBulkString(cmd, ref curr, end))
while (!RespWriteUtils.WriteAsciiBulkString(cmd, ref curr, end))
{
Flush();
curr = offset;
Expand All @@ -325,7 +325,7 @@ private void InternalExecute(params string[] command)

private int ProcessReplies(byte* recvBufferPtr, int bytesRead)
{
// Debug.WriteLine("RECV: [" + Encoding.UTF8.GetString(new Span<byte>(recvBufferPtr, bytesRead).ToArray()).Replace("\n", "|").Replace("\r", "") + "]");
// Debug.WriteLine("RECV: [" + Encoding.UTF8.GetString(new Span<byte>(recvBufferPtr, bytesRead)).Replace("\n", "|").Replace("\r", "") + "]");

string result = null;
string[] resultArray = null;
Expand Down Expand Up @@ -366,7 +366,7 @@ private int ProcessReplies(byte* recvBufferPtr, int bytesRead)
break;

default:
throw new Exception("Unexpected response: " + Encoding.UTF8.GetString(new Span<byte>(recvBufferPtr, bytesRead).ToArray()).Replace("\n", "|").Replace("\r", "") + "]");
throw new Exception("Unexpected response: " + Encoding.UTF8.GetString(new Span<byte>(recvBufferPtr, bytesRead)).Replace("\n", "|").Replace("\r", "") + "]");
}

if (!success) return readHead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public Task<string> Authenticate(string username, string password)
if (username != null)
{
//2
while (!RespWriteUtils.WriteBulkString(username, ref curr, end))
while (!RespWriteUtils.WriteAsciiBulkString(username, ref curr, end))
{
Flush();
curr = offset;
Expand All @@ -63,7 +63,7 @@ public Task<string> Authenticate(string username, string password)
}

//3
while (!RespWriteUtils.WriteBulkString(password, ref curr, end))
while (!RespWriteUtils.WriteAsciiBulkString(password, ref curr, end))
{
Flush();
curr = offset;
Expand Down Expand Up @@ -128,7 +128,7 @@ public Task<string> SetSlotRange(Memory<byte> state, string nodeid, List<(int, i
if (nodeid != null)
{
//4
while (!RespWriteUtils.WriteBulkString(nodeid, ref curr, end))
while (!RespWriteUtils.WriteAsciiBulkString(nodeid, ref curr, end))
{
Flush();
curr = offset;
Expand Down Expand Up @@ -198,7 +198,7 @@ public Task<string> MigrateData(string sourceNodeId, Memory<byte> replaceOption,
offset = curr;

//3
while (!RespWriteUtils.WriteBulkString(sourceNodeId, ref curr, end))
while (!RespWriteUtils.WriteAsciiBulkString(sourceNodeId, ref curr, end))
{
Flush();
curr = offset;
Expand Down Expand Up @@ -365,7 +365,7 @@ public void SetClusterMigrate(string sourceNodeId, Memory<byte> replaceOption, M
offset = curr;

//3
while (!RespWriteUtils.WriteBulkString(sourceNodeId, ref curr, end))
while (!RespWriteUtils.WriteAsciiBulkString(sourceNodeId, ref curr, end))
{
Flush();
curr = offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ public Task<string> ExecuteReplicaSync(string nodeId, string primary_replid, byt
offset = curr;

//3
while (!RespWriteUtils.WriteBulkString(nodeId, ref curr, end))
while (!RespWriteUtils.WriteAsciiBulkString(nodeId, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//4
while (!RespWriteUtils.WriteBulkString(primary_replid, ref curr, end))
while (!RespWriteUtils.WriteAsciiBulkString(primary_replid, ref curr, end))
{
Flush();
curr = offset;
Expand Down Expand Up @@ -294,31 +294,31 @@ public Task<string> ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool se
offset = curr;

//3
while (!RespWriteUtils.WriteBulkString(sendStoreCheckpoint ? Encoding.ASCII.GetBytes("1") : Encoding.ASCII.GetBytes("0"), ref curr, end))
while (!RespWriteUtils.WriteBulkString(sendStoreCheckpoint ? "1"u8 : "0"u8, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//4
while (!RespWriteUtils.WriteBulkString(sendObjectStoreCheckpoint ? Encoding.ASCII.GetBytes("1") : Encoding.ASCII.GetBytes("0"), ref curr, end))
while (!RespWriteUtils.WriteBulkString(sendObjectStoreCheckpoint ? "1"u8 : "0"u8, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//5
while (!RespWriteUtils.WriteBulkString(replayAOF ? Encoding.ASCII.GetBytes("1") : Encoding.ASCII.GetBytes("0"), ref curr, end))
while (!RespWriteUtils.WriteBulkString(replayAOF ? "1"u8 : "0"u8, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//6
while (!RespWriteUtils.WriteBulkString(primary_replid, ref curr, end))
while (!RespWriteUtils.WriteAsciiBulkString(primary_replid, ref curr, end))
{
Flush();
curr = offset;
Expand Down
10 changes: 5 additions & 5 deletions libs/client/GarnetClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,9 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory<byte> op, string par

RespWriteUtils.WriteDirect(op.Span, ref curr, end);
if (param1 != null)
RespWriteUtils.WriteBulkString(param1, ref curr, end);
RespWriteUtils.WriteUtf8BulkString(param1, ref curr, end);
if (param2 != null)
RespWriteUtils.WriteBulkString(param2, ref curr, end);
RespWriteUtils.WriteUtf8BulkString(param2, ref curr, end);

Debug.Assert(curr == end);
}
Expand Down Expand Up @@ -651,7 +651,7 @@ async ValueTask InternalExecuteAsync(Memory<byte> op, Memory<byte> clusterOp, st

RespWriteUtils.WriteDirect(op.Span, ref curr, end);
RespWriteUtils.WriteBulkString(clusterOp.Span, ref curr, end);
RespWriteUtils.WriteBulkString(nodeId, ref curr, end);
RespWriteUtils.WriteUtf8BulkString(nodeId, ref curr, end);
RespWriteUtils.WriteArrayItem(currentAddress, ref curr, end);
RespWriteUtils.WriteArrayItem(nextAddress, ref curr, end);
RespWriteUtils.WriteBulkString(new Span<byte>((void*)payloadPtr, payloadLength), ref curr, end);
Expand Down Expand Up @@ -854,11 +854,11 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, string op, ICollection<stri
byte* end = curr + totalLen;
RespWriteUtils.WriteArrayLength(arraySize, ref curr, end);

RespWriteUtils.WriteBulkString(op, ref curr, end);//Write op data
RespWriteUtils.WriteAsciiBulkString(op, ref curr, end);//Write op data
if (isArray)//Write arg data
{
foreach (var arg in args)
RespWriteUtils.WriteBulkString(arg, ref curr, end);
RespWriteUtils.WriteUtf8BulkString(arg, ref curr, end);
}

Debug.Assert(curr == end);
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,11 @@ public ReadOnlySpan<byte> TrySetLocalConfigEpoch(long configEpoch)
{
var current = currentConfig;
if (current.NumWorkers == 0)
return new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes($"-ERR workers not initialized.\r\n"));
return "-ERR workers not initialized.\r\n"u8;

var newConfig = currentConfig.SetLocalWorkerConfigEpoch(configEpoch);
if (newConfig == null)
return new ReadOnlySpan<byte>(Encoding.ASCII.GetBytes($"-ERR Node config epoch was not set due to invalid epoch specified.\r\n"));
return "-ERR Node config epoch was not set due to invalid epoch specified.\r\n"u8;

if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
Expand Down
Loading

0 comments on commit f2ee39a

Please sign in to comment.