Skip to content

Commit

Permalink
Handle large writes in SendAndReset, adjust index default size (#381)
Browse files Browse the repository at this point in the history
* Handle large writes in SendAndReset, support larger echo responses by splitting the response.

* remove nested try, add stats increment to RespParsingException catch path

* Set default hash index size to 128m
Provide logger info on what the size optimizes for

* fix object store index default size to be 16m instead of 1g

* Set checkpointDir to logDir (or current directory if logDir is null) if checkpointDir is not directly provided.

* fix warning
  • Loading branch information
badrishc committed May 14, 2024
1 parent 6ce2871 commit 0bdc7a3
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 36 deletions.
9 changes: 9 additions & 0 deletions libs/common/GarnetException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using System.Diagnostics.CodeAnalysis;

namespace Garnet.common
{
Expand Down Expand Up @@ -33,5 +34,13 @@ public GarnetException(string message) : base(message)
public GarnetException(string message, Exception innerException) : base(message, innerException)
{
}

/// <summary>
/// Throw helper that throws a GarnetException.
/// </summary>
/// <param name="message">Exception message.</param>
[DoesNotReturn]
public static void Throw(string message) =>
throw new GarnetException(message);
}
}
2 changes: 1 addition & 1 deletion libs/common/Parsing/RespParsingException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static unsafe void ThrowIntegerOverflow(byte* buffer, int length)
/// </summary>
/// <param name="message">Exception message.</param>
[DoesNotReturn]
public static void Throw(string message) =>
public static new void Throw(string message) =>
throw new RespParsingException(message);
}
}
2 changes: 1 addition & 1 deletion libs/common/RespWriteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public static bool WriteError(ReadOnlySpan<char> errorString, ref byte* curr, by
/// <summary>
/// Writes the contents of <paramref name="span"/> as byte array to <paramref name="curr"/>
/// </summary>
/// <returns><see langword="true"/> if the the <paramref name="span"/> could be written to <paramref name="curr"/>; <see langword="false"/> otherwise.</returns>
/// <returns><see langword="true"/> if the <paramref name="span"/> could be written to <paramref name="curr"/>; <see langword="false"/> otherwise.</returns>
public static bool WriteDirect(ReadOnlySpan<byte> span, ref byte* curr, byte* end)
{
if (span.Length > (int)(end - curr))
Expand Down
2 changes: 1 addition & 1 deletion libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
var logDir = LogDir;
if (!useAzureStorage && enableStorageTier) logDir = new DirectoryInfo(string.IsNullOrEmpty(logDir) ? "." : logDir).FullName;
var checkpointDir = CheckpointDir;
if (!useAzureStorage) checkpointDir = new DirectoryInfo(string.IsNullOrEmpty(checkpointDir) ? "." : checkpointDir).FullName;
if (!useAzureStorage) checkpointDir = new DirectoryInfo(string.IsNullOrEmpty(checkpointDir) ? (string.IsNullOrEmpty(logDir) ? "." : logDir) : checkpointDir).FullName;

var address = !string.IsNullOrEmpty(this.Address) && this.Address.Equals("localhost", StringComparison.CurrentCultureIgnoreCase)
? IPAddress.Loopback.ToString()
Expand Down
4 changes: 2 additions & 2 deletions libs/host/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"SegmentSize" : "1g",

/* Size of hash index in bytes (rounds down to power of 2) */
"IndexSize" : "8g",
"IndexSize" : "128m",

/* Max size of hash index in bytes (rounds down to power of 2) */
"IndexMaxSize": "",
Expand All @@ -40,7 +40,7 @@
"ObjectStoreSegmentSize" : "32m",

/* Size of object store hash index in bytes (rounds down to power of 2) */
"ObjectStoreIndexSize" : "1g",
"ObjectStoreIndexSize" : "16m",

/* Max size of object store hash index in bytes (rounds down to power of 2) */
"ObjectStoreIndexMaxSize": "",
Expand Down
3 changes: 1 addition & 2 deletions libs/server/Resp/AdminCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ private bool ProcessAdminCommands<TGarnetApi>(RespCommand command, ReadOnlySpan<
GetCommand(bufSpan, out bool success1);
if (!success1) return false;
var length = readHead - oldReadHead;
while (!RespWriteUtils.WriteDirect(bufSpan.Slice(oldReadHead, length), ref dcurr, dend))
SendAndReset();
WriteDirectLarge(bufSpan.Slice(oldReadHead, length));
}
}
else if (command == RespCommand.INFO)
Expand Down
67 changes: 49 additions & 18 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,28 +211,23 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
clusterSession?.AcquireCurrentEpoch();
recvBufferPtr = reqBuffer;
networkSender.GetResponseObject();

try
{
ProcessMessages();
}
catch (RespParsingException ex)
{
logger?.LogCritical($"Aborting open session due to RESP parsing error: {ex.Message}");
logger?.LogDebug(ex, "RespParsingException in ProcessMessages:");

// Forward parsing error as RESP error
while (!RespWriteUtils.WriteError($"ERR Protocol Error: {ex.Message}", ref dcurr, dend))
SendAndReset();

// Send message and dispose the network sender to end the session
Send(networkSender.GetResponseObjectHead());
networkSender.Dispose();
}
ProcessMessages();
recvBufferPtr = null;
}
catch (RespParsingException ex)
{
sessionMetrics?.incr_total_number_resp_server_session_exceptions(1);
logger?.LogCritical($"Aborting open session due to RESP parsing error: {ex.Message}");
logger?.LogDebug(ex, "RespParsingException in ProcessMessages:");

// Forward parsing error as RESP error
while (!RespWriteUtils.WriteError($"ERR Protocol Error: {ex.Message}", ref dcurr, dend))
SendAndReset();

// Send message and dispose the network sender to end the session
Send(networkSender.GetResponseObjectHead());
networkSender.Dispose();
}
catch (Exception ex)
{
sessionMetrics?.incr_total_number_resp_server_session_exceptions(1);
Expand Down Expand Up @@ -769,6 +764,13 @@ private void SendAndReset()
dcurr = networkSender.GetResponseObjectHead();
dend = networkSender.GetResponseObjectTail();
}
else
{
// Reaching here means that we retried SendAndReset without the RespWriteUtils.Write*
// method making any progress. This should only happen when the message being written is
// too large to fit in the response buffer.
GarnetException.Throw("Failed to write to response buffer");
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -813,6 +815,35 @@ private void SendAndReset(IMemoryOwner<byte> memory, int length)
memory.Dispose();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void WriteDirectLarge(ReadOnlySpan<byte> src)
{
// Repeat while we have bytes left to write
while (src.Length > 0)
{
// Compute space left on output buffer
int destSpace = (int)(dend - dcurr);

// Fast path if there is enough space
if (src.Length <= destSpace)
{
src.CopyTo(new Span<byte>(dcurr, src.Length));
dcurr += src.Length;
break;
}

// Adjust number of bytes to copy, to space left on output buffer, then copy
src.Slice(0, destSpace).CopyTo(new Span<byte>(dcurr, destSpace));
src = src.Slice(destSpace);

// Send and reset output buffer
Send(networkSender.GetResponseObjectHead());
networkSender.GetResponseObject();
dcurr = networkSender.GetResponseObjectHead();
dend = networkSender.GetResponseObjectTail();
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void Send(byte* d)
{
Expand Down
12 changes: 9 additions & 3 deletions libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class GarnetServerOptions : ServerOptions
/// <summary>
/// Size of object store hash index in bytes (rounds down to power of 2).
/// </summary>
public string ObjectStoreIndexSize = "1g";
public string ObjectStoreIndexSize = "16m";

/// <summary>
/// Max size of object store hash index in bytes (rounds down to power of 2).
Expand Down Expand Up @@ -381,14 +381,17 @@ public void GetSettings(out LogSettings logSettings, out int indexSize, out Revi

indexSize = IndexSizeCachelines("hash index size", IndexSize);
logger?.LogInformation($"[Store] Using hash index size of {PrettySize(indexSize * 64L)} ({PrettySize(indexSize)} cache lines)");
logger?.LogInformation($"[Store] Hash index size is optimized for up to ~{PrettySize(indexSize * 4L)} distinct keys");

AdjustedIndexMaxSize = IndexMaxSize == string.Empty ? 0 : IndexSizeCachelines("hash index max size", IndexMaxSize);
if (AdjustedIndexMaxSize != 0 && AdjustedIndexMaxSize < indexSize)
throw new Exception($"Index size {IndexSize} should not be less than index max size {IndexMaxSize}");

if (AdjustedIndexMaxSize > 0)
{
logger?.LogInformation($"[Store] Using hash index max size of {PrettySize(AdjustedIndexMaxSize * 64L)}, ({PrettySize(AdjustedIndexMaxSize)} cache lines)");

logger?.LogInformation($"[Store] Hash index max size is optimized for up to ~{PrettySize(AdjustedIndexMaxSize * 4L)} distinct keys");
}
logger?.LogInformation($"[Store] Using log mutable percentage of {MutablePercent}%");

if (DeviceFactoryCreator == null)
Expand Down Expand Up @@ -521,14 +524,17 @@ public void GetObjectStoreSettings(out LogSettings objLogSettings, out Revivific

objIndexSize = IndexSizeCachelines("object store hash index size", ObjectStoreIndexSize);
logger?.LogInformation($"[Object Store] Using hash index size of {PrettySize(objIndexSize * 64L)} ({PrettySize(objIndexSize)} cache lines)");
logger?.LogInformation($"[Object Store] Hash index size is optimized for up to ~{PrettySize(objIndexSize * 4L)} distinct keys");

AdjustedObjectStoreIndexMaxSize = ObjectStoreIndexMaxSize == string.Empty ? 0 : IndexSizeCachelines("hash index max size", ObjectStoreIndexMaxSize);
if (AdjustedObjectStoreIndexMaxSize != 0 && AdjustedObjectStoreIndexMaxSize < objIndexSize)
throw new Exception($"Index size {IndexSize} should not be less than index max size {IndexMaxSize}");

if (AdjustedObjectStoreIndexMaxSize > 0)
{
logger?.LogInformation($"[Object Store] Using hash index max size of {PrettySize(AdjustedObjectStoreIndexMaxSize * 64L)}, ({PrettySize(AdjustedObjectStoreIndexMaxSize)} cache lines)");

logger?.LogInformation($"[Object Store] Hash index max size is optimized for up to ~{PrettySize(AdjustedObjectStoreIndexMaxSize * 4L)} distinct keys");
}
logger?.LogInformation($"[Object Store] Using log mutable percentage of {ObjectStoreMutablePercent}%");

objTotalMemorySize = ParseSize(ObjectStoreTotalMemorySize);
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Servers/ServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ServerOptions
/// <summary>
/// Size of hash index in bytes (rounds down to power of 2).
/// </summary>
public string IndexSize = "8g";
public string IndexSize = "128m";

/// <summary>
/// Max size of hash index in bytes (rounds down to power of 2). If unspecified, index size doesn't grow (default behavior).
Expand Down
14 changes: 7 additions & 7 deletions test/Garnet.test/GarnetServerConfigTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public void ImportExportConfigLocal()
// No import path, include command line args, export to file
// Check values from command line override values from defaults.conf
static string GetFullExtensionBinPath(string testProjectName) => Path.GetFullPath(testProjectName, TestUtils.RootTestsProjectPath);
var args = new string[] { "--config-export-path", configPath, "-p", "4m", "-m", "8g", "-s", "2g", "--recover", "--port", "53", "--reviv-obj-bin-record-count", "2", "--reviv-fraction", "0.5", "--extension-bin-paths", $"{GetFullExtensionBinPath("Garnet.test")},{GetFullExtensionBinPath("Garnet.test.cluster")}" };
var args = new string[] { "--config-export-path", configPath, "-p", "4m", "-m", "128m", "-s", "2g", "--recover", "--port", "53", "--reviv-obj-bin-record-count", "2", "--reviv-fraction", "0.5", "--extension-bin-paths", $"{GetFullExtensionBinPath("Garnet.test")},{GetFullExtensionBinPath("Garnet.test.cluster")}" };
parseSuccessful = ServerSettingsManager.TryParseCommandLineArguments(args, out options, out invalidOptions);
Assert.IsTrue(parseSuccessful);
Assert.AreEqual(invalidOptions.Count, 0);
Assert.AreEqual("4m", options.PageSize);
Assert.AreEqual("8g", options.MemorySize);
Assert.AreEqual("128m", options.MemorySize);
Assert.AreEqual("2g", options.SegmentSize);
Assert.AreEqual(53, options.Port);
Assert.AreEqual(2, options.RevivObjBinRecordCount);
Expand All @@ -102,7 +102,7 @@ public void ImportExportConfigLocal()
Assert.IsTrue(parseSuccessful);
Assert.AreEqual(invalidOptions.Count, 0);
Assert.IsTrue(options.PageSize == "4m");
Assert.IsTrue(options.MemorySize == "8g");
Assert.IsTrue(options.MemorySize == "128m");

// Import from previous export command, include command line args, export to file
// Check values from import path override values from default.conf, and values from command line override values from default.conf and import path
Expand All @@ -111,7 +111,7 @@ public void ImportExportConfigLocal()
Assert.IsTrue(parseSuccessful);
Assert.AreEqual(invalidOptions.Count, 0);
Assert.AreEqual("12m", options.PageSize);
Assert.AreEqual("8g", options.MemorySize);
Assert.AreEqual("128m", options.MemorySize);
Assert.AreEqual("1g", options.SegmentSize);
Assert.AreEqual(0, options.Port);
Assert.IsFalse(options.Recover);
Expand Down Expand Up @@ -205,19 +205,19 @@ public void ImportExportConfigAzure()
Assert.IsTrue(options.PageSize == "32m");
Assert.IsTrue(options.MemorySize == "16g");

var args = new string[] { "--storage-string", AzureEmulatedStorageString, "--use-azure-storage-for-config-export", "true", "--config-export-path", configPath, "-p", "4m", "-m", "8g" };
var args = new string[] { "--storage-string", AzureEmulatedStorageString, "--use-azure-storage-for-config-export", "true", "--config-export-path", configPath, "-p", "4m", "-m", "128m" };
parseSuccessful = ServerSettingsManager.TryParseCommandLineArguments(args, out options, out invalidOptions);
Assert.IsTrue(parseSuccessful);
Assert.AreEqual(invalidOptions.Count, 0);
Assert.IsTrue(options.PageSize == "4m");
Assert.IsTrue(options.MemorySize == "8g");
Assert.IsTrue(options.MemorySize == "128m");

args = ["--storage-string", AzureEmulatedStorageString, "--use-azure-storage-for-config-import", "true", "--config-import-path", configPath];
parseSuccessful = ServerSettingsManager.TryParseCommandLineArguments(args, out options, out invalidOptions);
Assert.IsTrue(parseSuccessful);
Assert.AreEqual(invalidOptions.Count, 0);
Assert.IsTrue(options.PageSize == "4m");
Assert.IsTrue(options.MemorySize == "8g");
Assert.IsTrue(options.MemorySize == "128m");

// Delete blob
deviceFactory.Initialize(AzureTestDirectory);
Expand Down

0 comments on commit 0bdc7a3

Please sign in to comment.