Skip to content

Commit

Permalink
Unify RESP error serialization (#252)
Browse files Browse the repository at this point in the history
* Add WriteGenericError methods to RespWriteUtils

* Swap direct inline errors string writes to WriteGenericError

* Prefix all generic error CmdStrings to make them distinguishable from other errors

* wip: begin moving CmdStrings to GenericErrorWrite

* Edit custom command registration logic to allow using the new generic error logic

* Rename the formatting string to similar convention as the static generic errors

* Fix tests to handle new error formatting

* Prefix all generic error CmdStrings in cluster project

* wip: Begin migrating cluster errors to WriteGenericError

* Some refactoring done in ProcessSlotManage to be able discern failure modes

* Also potential unused "errorMessage" in TryResetSlotsState?

* Make SETSLOT use generic error serialization

* Make SET-CONFIG-EPOCH use generic error serialization

* Make FORGET use generic error serialization

* Make REPLICATE use generic error serialization

* Make aofsync use generic error serialization

* Make ADDSLOTS and ADDSLOTSRANGE use generic error serialization

* Make DELSLOTS and DELSLOTSRANGE use generic error serialization

* Make DELKEYSINSLOTRANGE use generic error serialization

* Also flattened control flow where else block is unnecessary

* Restore behavior in SETSLOTSRANGE which was affected by TryParseSlots changes

* Make REPLICAOF use generic error serialization

* Make MIGRATE use generic error serialization

* Make FAILOVER use generic error serialization

* Make FAILOVER use generic error serialization

* Well, the ProcessFailoverCommands method

* Make HISTOGRAM use generic error serialization

* Now moving back over to Garnet.Server

* Make RESET use generic error serialization

* Make HashObjectImpl use generic error serialization

* Missed one -ERR prefix in RESET

* Make SortedSetObjectImpl use generic error serialization

* Move many trivial ones in Garnet.server

* Regexing these, no complex control flow to worry about.

* dotnet-format

* Adjust tests a bit

* Flip some assert.equals ordering to give correct expect & actual pairs

* Couple collection experssions, couldn't help myself.

* Restore some error equals assert to original behavior

* Fix one error PRIMARY-ERR format

* Restore behavior in ClusterCommand around slot parsing

* Make HandleCommandParsingErrors use generic error serialization

* This is used by TryMIGRATE

* Move more WriteDirect to WriteGenericError

* dotnet-format

* Abort the plan. Remove WriteGenericError.

* And switch over to WriteError

* Adjust tests for the WriteError

* And few simplifications I came across

* Migrate libs/cluster to WriteError

* Migrate libs/server to WriteError

* Missed couple in cluster

* I'm blind

* Remove negative-sign from logged errors

* Refactor cluster slot commands to use return values instead of errorMessages

* Address PR feedback

* TryResetSlot(s)State -> ResetSlot(s)State

---------

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
Co-authored-by: vazois <vazois@microsoft.com>
  • Loading branch information
3 people committed Apr 8, 2024
1 parent 763c517 commit 765e614
Show file tree
Hide file tree
Showing 54 changed files with 875 additions and 690 deletions.
55 changes: 28 additions & 27 deletions libs/cluster/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,43 +92,44 @@ static class CmdStrings
/// Response strings
/// </summary>
public static ReadOnlySpan<byte> RESP_OK => "+OK\r\n"u8;
public static ReadOnlySpan<byte> RESP_NOAUTH => "-NOAUTH Authentication required.\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_1 => ":1\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_0 => ":0\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_N1 => ":-1\r\n"u8;

/// <summary>
/// Response string templates
/// </summary>
public const string ErrMissingParam = "-ERR wrong number of arguments for '{0}' command\r\n";

public const string GenericErrMissingParam = "ERR wrong number of arguments for '{0}' command";

/// <summary>
/// Error response strings
/// </summary>
public static ReadOnlySpan<byte> RESP_ERRCLUSTER => "-ERR This instance has cluster support disabled\r\n"u8;
public static ReadOnlySpan<byte> RESP_SLOT_OUT_OFF_RANGE => "-ERR Slot out of range\r\n"u8;
public static ReadOnlySpan<byte> RESP_CONFIG_UPDATE_ERROR => "-ERR Updating the config epoch\r\n"u8;
public static ReadOnlySpan<byte> RESP_CONFIG_EPOCH_ASSIGNMENT_ERROR => "-ERR The user can assign a config epoch only when the node does not know any other node.\r\n"u8;
public static ReadOnlySpan<byte> RESP_REPLICATION_AOF_TURNEDOFF_ERROR => "-ERR Replica AOF is switched off. Replication unavailable. Please restart replica with --aof option.\r\n"u8;
public static ReadOnlySpan<byte> RESP_SLOTSTATE_TRANSITION_ERROR => "-ERR Slot already in that state\r\n"u8;
public static ReadOnlySpan<byte> RESP_CANNOT_REPLICATE_SELF_ERROR => "-ERR Can't replicate myself\r\n"u8;
public static ReadOnlySpan<byte> RESP_CANNOT_MAKE_REPLICA_WITH_ASSIGNED_SLOTS_ERROR => "-ERR Primary has been assigned slots and cannot be a replica\r\n"u8;
public static ReadOnlySpan<byte> RESP_CANNOT_FORGET_MYSELF_ERROR => "-ERR I tried hard but I can't forget myself\r\n"u8;
public static ReadOnlySpan<byte> RESP_CANNOT_FORGET_MY_PRIMARY_ERROR => "-ERR Can't forget my primary\r\n"u8;
public static ReadOnlySpan<byte> RESP_CANNOT_FAILOVER_FROM_NON_MASTER => "-ERR Cannot failover a non-master node\r\n"u8;
public static ReadOnlySpan<byte> RESP_UNKNOWN_ENDPOINT_ERROR => "-ERR Unknown endpoint\r\n"u8;
/// Generic error respone strings, i.e. these are sent in the form "-ERR responseString\r\n"
/// </summary>
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CLUSTER => "ERR This instance has cluster support disabled"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SLOT_OUT_OFF_RANGE => "ERR Slot out of range"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CONFIG_UPDATE => "ERR Updating the config epoch"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CONFIG_EPOCH_ASSIGNMENT => "ERR The user can assign a config epoch only when the node does not know any other node."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_REPLICATION_AOF_TURNEDOFF => "ERR Replica AOF is switched off. Replication unavailable. Please restart replica with --aof option."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SLOTSTATE_TRANSITION => "ERR Slot already in that state"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_FORGET_MYSELF => "ERR I tried hard but I can't forget myself"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_FORGET_MY_PRIMARY => "ERR Can't forget my primary"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_FAILOVER_FROM_NON_MASTER => "ERR Cannot failover a non-master node"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_UNKNOWN_ENDPOINT => "ERR Unknown endpoint"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_MAKE_REPLICA_WITH_ASSIGNED_SLOTS => "ERR Primary has been assigned slots and cannot be a replica"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_REPLICATE_SELF => "ERR Can't replicate myself"u8;

#region RespSlotValidation
public static ReadOnlySpan<byte> RESP_CROSSLOT_ERROR => "-CROSSSLOT Keys in request do not hash to the same slot\r\n"u8;
public static ReadOnlySpan<byte> RESP_CLUSTERDOWN_ERROR => "-CLUSTERDOWN Hash slot not served\r\n"u8;
public static ReadOnlySpan<byte> RESP_MIGRATING_ERROR => "-MIGRATING\r\n"u8;
#endregion
/// <summary>
/// Generic error response strings for <c>MIGRATE</c> command
/// </summary>
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_MIGRATE_TO_MYSELF => "ERR Can't MIGRATE to myself"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_INCOMPLETESLOTSRANGE => "ERR incomplete slotrange"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_PARSING => "ERR Parsing error"u8;

#region RespMigrateCmd
public static ReadOnlySpan<byte> RESP_MIGRATE_TO_MYSELF_ERROR => "-ERR Can't MIGRATE to myself\r\n"u8;
public static ReadOnlySpan<byte> RESP_INCOMPLETESLOTSRANGE_ERROR => "-ERR incomplete slotrange\r\n"u8;
public static ReadOnlySpan<byte> RESP_PARSING_ERROR => "-ERR Parsing error\r\n"u8;
#endregion
/// <summary>
/// Simple error respone strings, i.e. these are of the form "-errorString\r\n"
/// </summary>
public static ReadOnlySpan<byte> RESP_ERR_NOAUTH => "NOAUTH Authentication required."u8;
public static ReadOnlySpan<byte> RESP_ERR_CROSSLOT => "CROSSSLOT Keys in request do not hash to the same slot"u8;
public static ReadOnlySpan<byte> RESP_ERR_CLUSTERDOWN => "CLUSTERDOWN Hash slot not served"u8;
public static ReadOnlySpan<byte> RESP_ERR_MIGRATING => "MIGRATING"u8;
}
}
37 changes: 21 additions & 16 deletions libs/cluster/Server/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,63 +1031,68 @@ public ClusterConfig TakeOverFromPrimary()
}

/// <summary>
/// Make local node owner of list of slots given.
/// Fails if slot already owned by someone else according to a message received from the gossip protocol.
/// Try to make local node owner of list of slots given.
/// </summary>
/// <param name="slots">Slots to assign.</param>
/// <param name="slotAssigned">Slot already assigned if any during this bulk op.</param>
/// <param name="config">ClusterConfig object with updates</param>
/// <param name="state">SlotState type to be set.</param>
/// <returns>ClusterConfig object with updates.</returns>
public ClusterConfig AddSlots(List<int> slots, out int slotAssigned, SlotState state = SlotState.STABLE)
/// <returns><see langword="false"/> if slot already owned by someone else according to a message received from the gossip protocol; otherwise <see langword="true"/>.</returns>
public bool TryAddSlots(HashSet<int> slots, out int slotAssigned, out ClusterConfig config, SlotState state = SlotState.STABLE)
{
var newSlotMap = new HashSlot[16384];
Array.Copy(slotMap, newSlotMap, slotMap.Length);
slotAssigned = -1;
config = null;

var newSlotMap = new HashSlot[16384];
Array.Copy(slotMap, newSlotMap, slotMap.Length);
if (slots != null)
{
foreach (int slot in slots)
{
if (newSlotMap[slot].workerId != 0)
{
slotAssigned = slot;
return null;
return false;
}
newSlotMap[slot]._workerId = 1;
newSlotMap[slot]._state = state;
}
}

return new ClusterConfig(newSlotMap, workers);
config = new ClusterConfig(newSlotMap, workers);
return true;
}

/// <summary>
/// Remove slots from this local node.
/// Try to remove slots from this local node.
/// </summary>
/// <param name="slots">Slots to be removed.</param>
/// <param name="notLocalSlot">If a slot provided is not local.</param>
/// <returns>ClusterConfig object with updates.</returns>
public ClusterConfig RemoveSlots(List<int> slots, out int notLocalSlot)
/// <param name="notLocalSlot">The slot number that is not local.</param>
/// <param name="config">ClusterConfig object with updates</param>
/// <returns><see langword="false"/> if a slot provided is not local; otherwise <see langword="true"/>.</returns>
public bool TryRemoveSlots(HashSet<int> slots, out int notLocalSlot, out ClusterConfig config)
{
var newSlotMap = new HashSlot[16384];
Array.Copy(slotMap, newSlotMap, slotMap.Length);
notLocalSlot = -1;
config = null;

var newSlotMap = new HashSlot[16384];
Array.Copy(slotMap, newSlotMap, slotMap.Length);
if (slots != null)
{
foreach (int slot in slots)
{
if (newSlotMap[slot].workerId == 0)
{
notLocalSlot = slot;
return null;
return false;
}
newSlotMap[slot]._workerId = 0;
newSlotMap[slot]._state = SlotState.OFFLINE;
}
}

return new ClusterConfig(newSlotMap, workers);
config = new ClusterConfig(newSlotMap, workers);
return true;
}

/// <summary>
Expand Down
26 changes: 17 additions & 9 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,19 @@ public string GetInfo()
return ClusterInfo;
}

private static string GetRange(List<int> slots)
private static string GetRange(int[] slots)
{
var range = "> ";
var start = slots[0];
var end = slots[0];
for (var i = 1; i < slots.Count + 1; i++)
for (var i = 1; i < slots.Length + 1; i++)
{
if (i < slots.Count && slots[i] == end + 1)
if (i < slots.Length && slots[i] == end + 1)
end = slots[i];
else
{
range += $"{start}-{end} ";
if (i < slots.Count)
if (i < slots.Length)
{
start = slots[i];
end = slots[i];
Expand All @@ -218,28 +218,36 @@ private static string GetRange(List<int> slots)
}

/// <summary>
/// Update config epoch of local worker
/// Attempts to update config epoch of local worker
/// </summary>
/// <param name="configEpoch"></param>
/// <param name="errorMessage">The ASCII encoded error message if the method returned <see langword="false"/>; otherwise <see langword="default"/></param>
/// <returns></returns>
public ReadOnlySpan<byte> TrySetLocalConfigEpoch(long configEpoch)
public bool TrySetLocalConfigEpoch(long configEpoch, out ReadOnlySpan<byte> errorMessage)
{
errorMessage = default;
while (true)
{
var current = currentConfig;
if (current.NumWorkers == 0)
return "-ERR workers not initialized.\r\n"u8;
{
errorMessage = "ERR workers not initialized."u8;
return false;
}

var newConfig = currentConfig.SetLocalWorkerConfigEpoch(configEpoch);
if (newConfig == null)
return "-ERR Node config epoch was not set due to invalid epoch specified.\r\n"u8;
{
errorMessage = "ERR Node config epoch was not set due to invalid epoch specified."u8;
return false;
}

if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
}
FlushConfig();
logger?.LogTrace("SetConfigEpoch {configEpoch}", configEpoch);
return CmdStrings.RESP_OK;
return true;
}

/// <summary>
Expand Down

0 comments on commit 765e614

Please sign in to comment.