Skip to content

Commit

Permalink
Cluster Misc Cleanup (#102)
Browse files Browse the repository at this point in the history
* cleanup and refactor redirect tests

* cleanup comments

* cleanup linting errors

* cluster ops error strings cleanup

* cleanup of inline resp error strings

* replication commands error string cleanup

* cleanup inline error strings for migrate command

* more cleanup cluster tests

* refactor resp redirect messages

* more cleanup

* simplify and cleanup redirect tests

* more cleanup in replication tests

* more cluster migrate tests cleanup

* more cleanup cluster redirect tests

* refactor migrate cmd parsing errors

* revert statsInfo gossip stats

* code cleanup

* fix newline at end of file

* Revert benchmark imports

---------

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
vazois and badrishc committed Mar 27, 2024
1 parent 0a9f6b3 commit 9e0b027
Show file tree
Hide file tree
Showing 21 changed files with 1,070 additions and 1,178 deletions.
32 changes: 30 additions & 2 deletions libs/cluster/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,40 @@ static class CmdStrings
public static ReadOnlySpan<byte> RESP_NOAUTH => "-NOAUTH Authentication required.\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_1 => ":1\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_0 => ":0\r\n"u8;
public static ReadOnlySpan<byte> RESP_ERRCLUSTER => "-ERR This instance has cluster support disabled\r\n"u8;
public static ReadOnlySpan<byte> RESP_SLOT_OUT_OFF_RANGE => "-ERR Slot out of range\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_N1 => ":-1\r\n"u8;

/// <summary>
/// Response string templates
/// </summary>
public const string ErrMissingParam = "-ERR wrong number of arguments for '{0}' command\r\n";


/// <summary>
/// Error response strings
/// </summary>
public static ReadOnlySpan<byte> RESP_ERRCLUSTER => "-ERR This instance has cluster support disabled\r\n"u8;
public static ReadOnlySpan<byte> RESP_SLOT_OUT_OFF_RANGE => "-ERR Slot out of range\r\n"u8;
public static ReadOnlySpan<byte> RESP_CONFIG_UPDATE_ERROR => "-ERR Updating the config epoch\r\n"u8;
public static ReadOnlySpan<byte> RESP_CONFIG_EPOCH_ASSIGNMENT_ERROR => "-ERR The user can assign a config epoch only when the node does not know any other node.\r\n"u8;
public static ReadOnlySpan<byte> RESP_REPLICATION_AOF_TURNEDOFF_ERROR => "-ERR Replica AOF is switched off. Replication unavailable. Please restart replica with --aof option.\r\n"u8;
public static ReadOnlySpan<byte> RESP_SLOTSTATE_TRANSITION_ERROR => "-ERR Slot already in that state\r\n"u8;
public static ReadOnlySpan<byte> RESP_CANNOT_REPLICATE_SELF_ERROR => "-ERR Can't replicate myself\r\n"u8;
public static ReadOnlySpan<byte> RESP_CANNOT_MAKE_REPLICA_WITH_ASSIGNED_SLOTS_ERROR => "-ERR Primary has been assigned slots and cannot be a replica\r\n"u8;
public static ReadOnlySpan<byte> RESP_CANNOT_FORGET_MYSELF_ERROR => "-ERR I tried hard but I can't forget myself\r\n"u8;
public static ReadOnlySpan<byte> RESP_CANNOT_FORGET_MY_PRIMARY_ERROR => "-ERR Can't forget my primary\r\n"u8;
public static ReadOnlySpan<byte> RESP_CANNOT_FAILOVER_FROM_NON_MASTER => "-ERR Cannot failover a non-master node\r\n"u8;
public static ReadOnlySpan<byte> RESP_UNKNOWN_ENDPOINT_ERROR => "-ERR Unknown endpoint\r\n"u8;

#region RespSlotValidation
public static ReadOnlySpan<byte> RESP_CROSSLOT_ERROR => "-CROSSSLOT Keys in request do not hash to the same slot\r\n"u8;
public static ReadOnlySpan<byte> RESP_CLUSTERDOWN_ERROR => "-CLUSTERDOWN Hash slot not served\r\n"u8;
public static ReadOnlySpan<byte> RESP_MIGRATING_ERROR => "-MIGRATING\r\n"u8;
#endregion

#region RespMigrateCmd
public static ReadOnlySpan<byte> RESP_MIGRATE_TO_MYSELF_ERROR => "-ERR Can't MIGRATE to myself\r\n"u8;
public static ReadOnlySpan<byte> RESP_INCOMPLETESLOTSRANGE_ERROR => "-ERR incomplete slotrange\r\n"u8;
public static ReadOnlySpan<byte> RESP_PARSING_ERROR => "-ERR Parsing error\r\n"u8;
#endregion
}
}
67 changes: 24 additions & 43 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,6 @@

namespace Garnet.cluster
{
internal enum ClusterOp : byte
{
MIGRATION,
}

internal class WorkerComparer : IEqualityComparer<Worker>
{
public bool Equals(Worker a, Worker b)
{
return a.nodeid.Equals(b.nodeid);
}

public int GetHashCode(Worker key)
{
return key.nodeid.GetHashCode();
}
}

/// <summary>
/// Cluster manager
/// </summary>
Expand Down Expand Up @@ -78,9 +60,9 @@ internal sealed partial class ClusterManager : IDisposable
clusterConfigDevice = deviceFactory.Get(new FileDescriptor(directoryName: "", fileName: "nodes.conf"));
pool = new(1, (int)clusterConfigDevice.SectorSize);

string address = opts.Address ?? StoreWrapper.GetIp();
var address = opts.Address ?? StoreWrapper.GetIp();
logger = loggerFactory?.CreateLogger($"ClusterManager-{address}:{opts.Port}");
bool recoverConfig = clusterConfigDevice.GetFileSize(0) > 0 && !opts.CleanClusterConfig;
var recoverConfig = clusterConfigDevice.GetFileSize(0) > 0 && !opts.CleanClusterConfig;

tlsOptions = opts.TlsOptions;
if (!opts.CleanClusterConfig)
Expand All @@ -91,9 +73,9 @@ internal sealed partial class ClusterManager : IDisposable
if (recoverConfig)
{
logger?.LogTrace("Recover cluster config from disk");
byte[] config = ClusterUtils.ReadDevice(clusterConfigDevice, pool, logger);
var config = ClusterUtils.ReadDevice(clusterConfigDevice, pool, logger);
currentConfig = ClusterConfig.FromByteArray(config);
//Used to update endpoint if it change when running inside a container.
// Used to update endpoint if it change when running inside a container.
if (address != currentConfig.GetLocalNodeIp() || opts.Port != currentConfig.GetLocalNodePort())
{
logger?.LogInformation(
Expand All @@ -117,7 +99,7 @@ internal sealed partial class ClusterManager : IDisposable
gossipDelay = TimeSpan.FromSeconds(opts.GossipDelay);
clusterTimeout = opts.ClusterTimeout <= 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(opts.ClusterTimeout);
numActiveTasks = 0;
this.GossipSamplePercent = opts.GossipSamplePercent;
GossipSamplePercent = opts.GossipSamplePercent;
}

/// <summary>
Expand Down Expand Up @@ -198,7 +180,7 @@ private void InitLocal(string address, int port, bool recoverConfig)
public string GetInfo()
{
var current = CurrentConfig;
string ClusterInfo = $"" +
var ClusterInfo = $"" +
$"cluster_state:ok\r\n" +
$"cluster_slots_assigned:{current.GetSlotCountForState(SlotState.STABLE)}\r\n" +
$"cluster_slots_ok:{current.GetSlotCountForState(SlotState.STABLE)}\r\n" +
Expand All @@ -215,10 +197,10 @@ public string GetInfo()

private static string GetRange(List<int> slots)
{
string range = "> ";
int start = slots[0];
int end = slots[0];
for (int i = 1; i < slots.Count + 1; i++)
var range = "> ";
var start = slots[0];
var end = slots[0];
for (var i = 1; i < slots.Count + 1; i++)
{
if (i < slots.Count && slots[i] == end + 1)
end = slots[i];
Expand Down Expand Up @@ -276,21 +258,10 @@ public bool TryBumpClusterEpoch()
return true;
}

public long TryBumpCurrentClusterEpoch()
{
long currentEpoch = 0;
while (true)
{
var current = currentConfig;
var newConfig = current.BumpLocalNodeCurrentConfigEpoch();
currentEpoch = newConfig.GetLocalNodeCurrentConfigEpoch();
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
}
FlushConfig();
return currentEpoch;
}

/// <summary>
/// Set local node role
/// </summary>
/// <param name="role">Role type</param>
public void TrySetLocalNodeRole(NodeRole role)
{
while (true)
Expand All @@ -303,6 +274,9 @@ public void TrySetLocalNodeRole(NodeRole role)
FlushConfig();
}

/// <summary>
/// Reset node to primary.
/// </summary>
public void TryResetReplica()
{
while (true)
Expand All @@ -315,6 +289,10 @@ public void TryResetReplica()
FlushConfig();
}

/// <summary>
/// Force this node to be a replica of given node-id
/// </summary>
/// <param name="replicaId">Node-id to replicate</param>
public void TryStopWrites(string replicaId)
{
while (true)
Expand All @@ -327,6 +305,9 @@ public void TryStopWrites(string replicaId)
FlushConfig();
}

/// <summary>
/// Takeover as new primary but forcefully claiming ownernship of old primary's slots.
/// </summary>
public void TryTakeOverForPrimary()
{
while (true)
Expand Down

0 comments on commit 9e0b027

Please sign in to comment.