diff --git a/libs/cluster/Server/ClusterProvider.cs b/libs/cluster/Server/ClusterProvider.cs index 4a1de33970..302d65c72a 100644 --- a/libs/cluster/Server/ClusterProvider.cs +++ b/libs/cluster/Server/ClusterProvider.cs @@ -287,7 +287,7 @@ internal bool WaitForConfigTransition() foreach (var s in sessions) { var entryEpoch = s.LocalCurrentEpoch; - if (entryEpoch != 0 && entryEpoch >= currentEpoch) + if (entryEpoch != 0 && entryEpoch < currentEpoch) goto retry; } break; diff --git a/libs/cluster/Server/Replication/CheckpointEntry.cs b/libs/cluster/Server/Replication/CheckpointEntry.cs index b20a589a3e..a775f28444 100644 --- a/libs/cluster/Server/Replication/CheckpointEntry.cs +++ b/libs/cluster/Server/Replication/CheckpointEntry.cs @@ -43,7 +43,7 @@ public CheckpointEntry() } public long GetMinAofCoveredAddress() - => Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress); + => Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64); /// /// Indicate addition of new reader by trying to increment reader counter diff --git a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs index 44e2e02f0c..425cb3cb54 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs @@ -47,6 +47,7 @@ public void Dispose() public async Task SendCheckpoint() { errorMsg = default; + var retryCount = 0; var storeCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main); var objectStoreCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object); var current = clusterProvider.clusterManager.CurrentConfig; @@ -96,6 +97,8 @@ public async Task SendCheckpoint() { localEntry.RemoveReader(); _ = Thread.Yield(); + if (retryCount++ > 10) + throw new GarnetException("Attaching replica maximum retry count reached!"); goto retry; } } @@ -110,6 +113,8 @@ public async Task SendCheckpoint() { localEntry.RemoveReader(); _ = Thread.Yield(); + if (retryCount++ > 10) + throw new GarnetException("Attaching replica maximum retry count reached!"); goto retry; } } @@ -187,7 +192,7 @@ public async Task SendCheckpoint() var beginAddress = RecoveredReplicationOffset; if (!recoverFromRemote) { - //If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset + // If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset if (replicaAofBeginAddress > ReplicationManager.kFirstValidAofAddress && replicaAofBeginAddress > RecoveredReplicationOffset) { logger?.LogInformation( diff --git a/libs/server/Resp/ArrayCommands.cs b/libs/server/Resp/ArrayCommands.cs index 1e05e89854..d430740138 100644 --- a/libs/server/Resp/ArrayCommands.cs +++ b/libs/server/Resp/ArrayCommands.cs @@ -668,15 +668,25 @@ private bool NetworkSELECT(byte* ptr) readHead = (int)(ptr - recvBufferPtr); - if (string.Equals(result, "0")) + if (storeWrapper.serverOptions.EnableCluster) { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + // Cluster mode does not allow DBID + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SELECT_CLUSTER_MODE, ref dcurr, dend)) SendAndReset(); } else { - while (!RespWriteUtils.WriteError("ERR invalid database index."u8, ref dcurr, dend)) - SendAndReset(); + + if (string.Equals(result, "0")) + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SELECT_INVALID_INDEX, ref dcurr, dend)) + SendAndReset(); + } } return true; } @@ -728,7 +738,6 @@ private bool NetworkKEYS(int count, byte* ptr, ref TGarnetApi storag return true; } - private bool NetworkSCAN(int count, byte* ptr, ref TGarnetApi storageApi) where TGarnetApi : IGarnetApi { diff --git a/libs/server/Resp/CmdStrings.cs b/libs/server/Resp/CmdStrings.cs index af3a386e04..9b3fd23c8b 100644 --- a/libs/server/Resp/CmdStrings.cs +++ b/libs/server/Resp/CmdStrings.cs @@ -124,6 +124,9 @@ public static ReadOnlySpan GetConfig(ReadOnlySpan key) public static ReadOnlySpan RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER => "ERR value is not an integer or out of range."u8; public static ReadOnlySpan RESP_ERR_GENERIC_UKNOWN_SUBCOMMAND => "ERR Unknown subcommand. Try LATENCY HELP."u8; public static ReadOnlySpan RESP_ERR_GENERIC_INDEX_OUT_RANGE => "ERR index out of range"u8; + public static ReadOnlySpan RESP_ERR_GENERIC_SELECT_INVALID_INDEX => "ERR invalid database index."u8; + public static ReadOnlySpan RESP_ERR_GENERIC_SELECT_CLUSTER_MODE => "ERR SELECT is not allowed in cluster mode"u8; + /// /// Response string templates /// diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index 06c4caaafe..bf4ebf5d93 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -220,7 +220,8 @@ internal void Recover() { storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(storeIndexToken, storeHlogToken); if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(objectStoreIndexToken, objectStoreHlogToken); - lastSaveTime = DateTimeOffset.UtcNow; + if (storeVersion > 0 || objectStoreVersion > 0) + lastSaveTime = DateTimeOffset.UtcNow; } catch (Exception ex) { @@ -269,8 +270,8 @@ public long ReplayAOF(long untilAddress = -1) long replicationOffset = 0; try { - //When replaying AOF we do not want to write record again to AOF. - //So initialize local AofProcessor with recordToAof: false. + // When replaying AOF we do not want to write record again to AOF. + // So initialize local AofProcessor with recordToAof: false. var aofProcessor = new AofProcessor(this, recordToAof: false, logger); aofProcessor.Recover(untilAddress); aofProcessor.Dispose(); @@ -580,22 +581,22 @@ void CompleteCheckpoint() /// /// Take a checkpoint if no checkpoint was taken after the provided time offset /// - /// + /// /// - public async Task TakeOnDemandCheckpoint(DateTimeOffset afterTime) + public async Task TakeOnDemandCheckpoint(DateTimeOffset entryTime) { - //Take lock to ensure not other task will be taking a checkpoint + // Take lock to ensure no other task will be taking a checkpoint while (!StartCheckpoint()) await Task.Yield(); - //If an external task has taken a checkpoint after the provided afterTime return - if (this.lastSaveTime > afterTime) + // If an external task has taken a checkpoint beyond the provided entryTime return + if (this.lastSaveTime > entryTime) { CompleteCheckpoint(); return; } - //If no newer checkpoint was taken compared to the provided afterTime take a checkpoint + // Necessary to take a checkpoint because the latest checkpoint is before entryTime await CheckpointTask(StoreType.All, logger: logger); } diff --git a/test/Garnet.test.cluster/ClusterReplicationTests.cs b/test/Garnet.test.cluster/ClusterReplicationTests.cs index 414434d4c9..eeed0c9378 100644 --- a/test/Garnet.test.cluster/ClusterReplicationTests.cs +++ b/test/Garnet.test.cluster/ClusterReplicationTests.cs @@ -236,6 +236,9 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, 0); else context.PopulatePrimaryRMW(ref context.kvPairs, keyLength, kvpairCount, 0, addCount); + + var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger); context.clusterTestUtils.Checkpoint(0, logger: context.logger); // Populate Primary @@ -243,8 +246,8 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, 1); context.clusterTestUtils.WaitForReplicaAofSync(0, 1, context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(0, context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(1, context.logger); + context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(1, replicaLastSaveTime, logger: context.logger); // Shutdown secondary context.nodes[1].Store.CommitAOF(true); @@ -599,9 +602,11 @@ public void ClusterReplicationSimpleFailover([Values] bool performRMW, [Values] if (checkpoint) { + var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger); context.clusterTestUtils.Checkpoint(0); - context.clusterTestUtils.WaitFirstCheckpoint(0, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(1, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(1, replicaLastSaveTime, logger: context.logger); } #region InitiateFailover @@ -667,8 +672,9 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo if (takePrimaryCheckpoint) { + var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger); context.clusterTestUtils.Checkpoint(0, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(0, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger); } // Wait for replication offsets to synchronize @@ -692,8 +698,9 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo if (takeNewPrimaryCheckpoint) { + var newPrimaryLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger); context.clusterTestUtils.Checkpoint(1, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(1, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(1, newPrimaryLastSaveTime, logger: context.logger); } context.clusterTestUtils.WaitForReplicaAofSync(1, 2, context.logger); @@ -912,11 +919,19 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp } else context.PopulatePrimaryWithObjects(ref context.kvPairsObj, keyLength, kvpairCount, primaryIndex: oldPrimaryIndex, set: set); - if (ckptBeforeDivergence) context.clusterTestUtils.Checkpoint(oldPrimaryIndex, logger: context.logger); + if (ckptBeforeDivergence) + { + var oldPrimaryLastSaveTime = context.clusterTestUtils.LastSave(oldPrimaryIndex, logger: context.logger); + var newPrimaryLastSaveTime = context.clusterTestUtils.LastSave(newPrimaryIndex, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(replicaIndex, logger: context.logger); + context.clusterTestUtils.Checkpoint(oldPrimaryIndex, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(oldPrimaryIndex, oldPrimaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(newPrimaryIndex, newPrimaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(replicaIndex, replicaLastSaveTime, logger: context.logger); + } + context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, newPrimaryIndex, context.logger); context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, replicaIndex, context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(newPrimaryIndex, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(replicaIndex, logger: context.logger); // Make this replica of no-one _ = context.clusterTestUtils.ReplicaOf(1, logger: context.logger); diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs index 1964a462ce..c0531be2b5 100644 --- a/test/Garnet.test.cluster/ClusterTestUtils.cs +++ b/test/Garnet.test.cluster/ClusterTestUtils.cs @@ -2647,16 +2647,39 @@ public void Checkpoint(IPEndPoint endPoint, ILogger logger = null) } } - public void WaitFirstCheckpoint(int nodeIndex, ILogger logger = null) - => WaitCheckpoint((IPEndPoint)endpoints[nodeIndex], logger: logger); + public DateTime LastSave(int nodeIndex, ILogger logger = null) + => LastSave((IPEndPoint)endpoints[nodeIndex], logger: logger); - public void WaitCheckpoint(IPEndPoint endPoint, ILogger logger = null) + public DateTime LastSave(IPEndPoint endPoint, ILogger logger = null) { try { var server = redis.GetServer(endPoint); - while (server.LastSave().Ticks == DateTimeOffset.FromUnixTimeSeconds(0).Ticks) + return server.LastSave(); + } + catch (Exception ex) + { + logger?.LogError(ex, "An error has occurred; WaitCheckpoint"); + Assert.Fail(); + } + return default; + } + + public void WaitCheckpoint(int nodeIndex, DateTime time, ILogger logger = null) + => WaitCheckpoint((IPEndPoint)endpoints[nodeIndex], time: time, logger: logger); + + public void WaitCheckpoint(IPEndPoint endPoint, DateTime time, ILogger logger = null) + { + try + { + var server = redis.GetServer(endPoint); + while (true) + { + var lastSaveTime = server.LastSave(); + if (lastSaveTime >= time) + break; BackOff(); + } } catch (Exception ex) {