diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index d69adb7e3c0..7f8839a55ee 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -43,8 +43,26 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) /// Indicates that the shutdown process should no longer be graceful. public override async Task StopAsync(CancellationToken cancellationToken) { - Dispose(); - await base.StopAsync(cancellationToken).ConfigureAwait(false); + try + { + if (server != null) + { + // If cancellation is requested, we will skip the graceful shutdown and proceed to dispose immediately + bool isForceShutdown = cancellationToken.IsCancellationRequested; + // Perform graceful shutdown with AOF commit and checkpoint when not forced Shutdown From OS. + await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(5), noSave: isForceShutdown, token: cancellationToken).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + // Force shutdown requested - proceed to dispose + } + finally + { + // Ensure base class cleanup although cancellationToken is cancelled + await base.StopAsync(CancellationToken.None).ConfigureAwait(false); + Dispose(); + } } public override void Dispose() @@ -55,6 +73,8 @@ public override void Dispose() } server?.Dispose(); _isDisposed = true; + base.Dispose(); + GC.SuppressFinalize(this); } } } \ No newline at end of file diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index 6dbbad160b0..f24ba0c1fe5 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -10,6 +10,7 @@ using System.Runtime.InteropServices; using System.Text; using System.Threading; +using System.Threading.Tasks; using Garnet.cluster; using Garnet.common; using Garnet.networking; @@ -450,6 +451,201 @@ public void Start() Console.WriteLine("* Ready to accept connections"); } + /// + /// Performs graceful shutdown of the server. + /// Stops accepting new connections, waits for active connections to complete, commits AOF, and takes checkpoint if needed. + /// + /// Timeout for waiting on active connections (default: 30 seconds) + /// If true, skip data persistence (AOF commit and checkpoint) during shutdown + /// Cancellation token + /// Task representing the async shutdown operation + public async Task ShutdownAsync(TimeSpan? timeout = null, bool noSave = false, CancellationToken token = default) + { + var shutdownTimeout = timeout ?? TimeSpan.FromSeconds(30); + + try + { + // Stop accepting new connections first + StopListening(); + + // Wait for existing connections to complete (cancellable) + try + { + await WaitForActiveConnectionsAsync(shutdownTimeout, token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + logger?.LogWarning("Connection draining was cancelled. Proceeding with data finalization..."); + } + } + catch (Exception ex) + { + logger?.LogError(ex, "Error during graceful shutdown"); + } + finally + { + if (!noSave) + { + // Attempt AOF commit or checkpoint as best-effort, + // even if connection draining was cancelled or failed. + // Use a bounded timeout instead of the caller's token to ensure completion. + using var finalizeCts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + try + { + await FinalizeDataAsync(finalizeCts.Token).ConfigureAwait(false); + } + catch (Exception ex) + { + logger?.LogError(ex, "Error during data finalization"); + } + } + else + { + logger?.LogInformation("Shutdown with noSave flag - skipping data persistence."); + } + } + } + + /// + /// Stop all servers from accepting new connections. + /// + private void StopListening() + { + if (servers == null) return; + + logger?.LogDebug("Stopping listeners to prevent new connections..."); + foreach (var server in servers) + { + try + { + server?.StopListening(); + } + catch (Exception ex) + { + logger?.LogWarning(ex, "Error stopping listener"); + } + } + } + + /// + /// Waits for active connections to complete within the specified timeout. + /// + private async Task WaitForActiveConnectionsAsync(TimeSpan timeout, CancellationToken token) + { + if (servers == null) return; + + // Linked Token : between external token and timeout + using var cts = CancellationTokenSource.CreateLinkedTokenSource(token); + cts.CancelAfter(timeout); + + var delays = new[] { 50, 300, 1000 }; + var delayIndex = 0; + + try + { + while (!cts.Token.IsCancellationRequested) + { + var activeConnections = GetActiveConnectionCount(); + if (activeConnections == 0) + { + logger?.LogInformation("All connections have been closed gracefully."); + return; + } + + logger?.LogInformation("Waiting for {ActiveConnections} active connections to complete...", activeConnections); + + var currentDelay = delays[delayIndex]; + if (delayIndex < delays.Length - 1) delayIndex++; + + await Task.Delay(currentDelay, cts.Token).ConfigureAwait(false); + } + } + catch (OperationCanceledException) when (token.IsCancellationRequested) + { + throw; + } + catch (OperationCanceledException) + { + // timeout reached error logging + logger?.LogWarning("Timeout reached after {TimeoutSeconds} seconds. Some connections may still be active.", + timeout.TotalSeconds); + } + catch (Exception ex) + { + logger?.LogWarning(ex, "Error checking active connections"); + await Task.Delay(500, token).ConfigureAwait(false); + } + } + + /// + /// Gets the current number of active connections directly from server instances. + /// + private long GetActiveConnectionCount() + { + long count = 0; + if (servers != null) + { + foreach (var garnetServer in servers) + { + if (garnetServer is GarnetServerBase garnetServerBase) + { + count += garnetServerBase.get_conn_active(); + } + } + } + return count; + } + + /// + /// Persists data during shutdown using AOF or checkpoint based on configuration. + /// + private async Task FinalizeDataAsync(CancellationToken token) + { + if (opts.EnableAOF) + { + logger?.LogDebug("Committing AOF before shutdown..."); + try + { + var commitSuccess = await Store.CommitAOFAsync(token).ConfigureAwait(false); + if (commitSuccess) + { + logger?.LogDebug("AOF committed successfully."); + } + else + { + logger?.LogInformation("AOF commit skipped (another commit in progress or replica mode)."); + } + } + catch (Exception ex) + { + logger?.LogError(ex, "Error committing AOF during shutdown"); + } + + return; + } + + if (!opts.EnableStorageTier) + return; + + logger?.LogDebug("Taking checkpoint for tiered storage..."); + try + { + var checkpointSuccess = Store.TakeCheckpoint(background: false, token: token); + if (checkpointSuccess) + { + logger?.LogDebug("Checkpoint completed successfully."); + } + else + { + logger?.LogInformation("Checkpoint skipped (another checkpoint in progress or replica mode)."); + } + } + catch (Exception ex) + { + logger?.LogError(ex, "Error taking checkpoint during shutdown"); + } + } + /// /// Dispose store (including log and checkpoint directory) /// diff --git a/libs/server/Servers/GarnetServerBase.cs b/libs/server/Servers/GarnetServerBase.cs index 5bfd1ff62ff..7f6386ac523 100644 --- a/libs/server/Servers/GarnetServerBase.cs +++ b/libs/server/Servers/GarnetServerBase.cs @@ -154,6 +154,12 @@ public bool AddSession(WireFormat protocol, ref ISessionProvider provider, INetw /// public abstract void Start(); + /// + public virtual void StopListening() + { + // Base implementation does nothing; derived classes should override + } + /// public virtual void Dispose() { diff --git a/libs/server/Servers/GarnetServerTcp.cs b/libs/server/Servers/GarnetServerTcp.cs index c681e09befa..b7485724dda 100644 --- a/libs/server/Servers/GarnetServerTcp.cs +++ b/libs/server/Servers/GarnetServerTcp.cs @@ -121,6 +121,22 @@ public override void Start() AcceptEventArg_Completed(null, acceptEventArg); } + /// + public override void StopListening() + { + try + { + // Close the listen socket to stop accepting new connections + // This will cause any pending AcceptAsync to complete with an error + listenSocket.Close(); + logger?.LogDebug("Stopped accepting new connections on {endpoint}", EndPoint); + } + catch (Exception ex) + { + logger?.LogDebug(ex, "Error closing listen socket on {endpoint}", EndPoint); + } + } + private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e) { try diff --git a/libs/server/Servers/IGarnetServer.cs b/libs/server/Servers/IGarnetServer.cs index 9e197451627..639e4a2a259 100644 --- a/libs/server/Servers/IGarnetServer.cs +++ b/libs/server/Servers/IGarnetServer.cs @@ -46,5 +46,11 @@ public interface IGarnetServer : IDisposable /// Start server /// public void Start(); + + /// + /// Stop accepting new connections (for graceful shutdown). + /// Existing connections remain active until they complete or are disposed. + /// + public void StopListening(); } } \ No newline at end of file diff --git a/libs/server/Servers/StoreApi.cs b/libs/server/Servers/StoreApi.cs index 5ff169c9fd5..2ed70ef7a3e 100644 --- a/libs/server/Servers/StoreApi.cs +++ b/libs/server/Servers/StoreApi.cs @@ -130,6 +130,35 @@ public bool FlushDB(int dbId = 0, bool unsafeTruncateLog = false) } } + /// + /// Take checkpoint for all active databases + /// + /// True if method can return before checkpoint is taken + /// Cancellation token + /// false if checkpoint was skipped due to node state or another checkpoint in progress + public bool TakeCheckpoint(bool background = false, CancellationToken token = default) + { + using (PreventRoleChange(out var acquired)) + { + if (!acquired || IsReplica) + { + return false; + } + + return storeWrapper.TakeCheckpoint(background, logger: null, token: token); + } + } + + /// + /// Check if storage tier is enabled + /// + public bool IsStorageTierEnabled => storeWrapper.serverOptions.EnableStorageTier; + + /// + /// Check if AOF is enabled + /// + public bool IsAOFEnabled => storeWrapper.serverOptions.EnableAOF; + /// /// Helper to disable role changes during a using block. /// diff --git a/main/GarnetServer/Program.cs b/main/GarnetServer/Program.cs index 7b2673ebc41..1b145f2a83d 100644 --- a/main/GarnetServer/Program.cs +++ b/main/GarnetServer/Program.cs @@ -22,7 +22,18 @@ static void Main(string[] args) // Start the server server.Start(); - Thread.Sleep(Timeout.Infinite); + using var shutdownEvent = new ManualResetEventSlim(false); + + Console.CancelKeyPress += (sender, e) => + { + e.Cancel = true; + // Graceful shutdown: drain connections, commit AOF, take checkpoint + server.ShutdownAsync(TimeSpan.FromSeconds(5)) + .GetAwaiter().GetResult(); + shutdownEvent.Set(); + }; + + shutdownEvent.Wait(); } catch (Exception ex) { diff --git a/test/Garnet.test/GarnetServerTcpTests.cs b/test/Garnet.test/GarnetServerTcpTests.cs new file mode 100644 index 00000000000..9591243a938 --- /dev/null +++ b/test/Garnet.test/GarnetServerTcpTests.cs @@ -0,0 +1,209 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Allure.NUnit; +using Garnet.server; +using NUnit.Framework; +using NUnit.Framework.Legacy; +using StackExchange.Redis; + +namespace Garnet.test +{ + [AllureNUnit] + [TestFixture, NonParallelizable] + public class GarnetServerTcpTests : AllureTestBase + { + private GarnetServer server; + + [SetUp] + public void Setup() + { + TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir); + server.Start(); + } + + [TearDown] + public void TearDown() + { + server?.Dispose(); + TestUtils.DeleteDirectory(TestUtils.MethodTestDir); + } + + [Test] + public void StopListeningPreventsNewConnections() + { + // Arrange - Establish a working connection first + using var redis1 = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db1 = redis1.GetDatabase(0); + db1.StringSet("test", "value"); + ClassicAssert.AreEqual("value", (string)db1.StringGet("test")); + + // Act - Stop listening on all servers + foreach (var tcpServer in server.Provider.StoreWrapper.Servers.OfType()) + { + tcpServer.StopListening(); + } + + Thread.Sleep(100); // Brief delay to ensure socket is closed + + // Assert - New connections should fail + Assert.Throws(() => + { + using var redis2 = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + redis2.GetDatabase(0).Ping(); + }); + + // Existing connection should still work + ClassicAssert.AreEqual("value", (string)db1.StringGet("test")); + } + + [Test] + public void StopListeningIdempotent() + { + // Arrange + foreach (var tcpServer in server.Provider.StoreWrapper.Servers.OfType()) + { + tcpServer.StopListening(); + } + + // Act & Assert - Calling StopListening again should not throw + Assert.DoesNotThrow(() => + { + foreach (var tcpServer in server.Provider.StoreWrapper.Servers.OfType()) + { + tcpServer.StopListening(); + } + }); + } + + [Test] + public async Task StopListeningDuringActiveConnectionAttempts() + { + // Arrange - Start multiple connection attempts + var connectionTasks = new System.Collections.Generic.List(); + using var cts = new CancellationTokenSource(); + + for (int i = 0; i < 10; i++) + { + connectionTasks.Add(Task.Run(async () => + { + while (!cts.Token.IsCancellationRequested) + { + try + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + await redis.GetDatabase(0).PingAsync(); + await Task.Delay(10); + } + catch + { + // Connection failures are expected after StopListening + } + } + }, cts.Token)); + } + + await Task.Delay(50); // Let some connections establish + + // Act + foreach (var tcpServer in server.Provider.StoreWrapper.Servers.OfType()) + { + tcpServer.StopListening(); + } + + await Task.Delay(100); + cts.Cancel(); + + // Assert - All tasks should complete without unhandled exceptions + Assert.DoesNotThrowAsync(async () => await Task.WhenAll(connectionTasks)); + } + + [Test] + public async Task ShutdownAsyncCompletesGracefully() + { + // Arrange - Write data and then close the connection + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + db.StringSet("shutdown-test", "data"); + ClassicAssert.AreEqual("data", (string)db.StringGet("shutdown-test")); + } + + // Act - Graceful shutdown (no active connections) + await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(5)).ConfigureAwait(false); + + // Assert - New connections should fail after shutdown + Assert.Throws(() => + { + using var redis2 = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + redis2.GetDatabase(0).Ping(); + }); + } + + [Test] + public async Task ShutdownAsyncRespectsTimeout() + { + // Arrange - Establish a connection that will stay open + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + db.Ping(); + + // Act - Shutdown with a very short timeout + var sw = System.Diagnostics.Stopwatch.StartNew(); + await server.ShutdownAsync(timeout: TimeSpan.FromMilliseconds(200)).ConfigureAwait(false); + sw.Stop(); + + // Assert - Should complete without hanging indefinitely + // Allow generous upper bound for CI environments + ClassicAssert.Less(sw.ElapsedMilliseconds, 10_000, + "ShutdownAsync should complete within a reasonable time even with active connections"); + } + + [Test] + public async Task ShutdownAsyncRespectsCancellation() + { + // Arrange + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + redis.GetDatabase(0).Ping(); + + using var cts = new CancellationTokenSource(); + + // Act - Cancel immediately + cts.Cancel(); + Assert.DoesNotThrowAsync(async () => + { + await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(30), token: cts.Token).ConfigureAwait(false); + }); + } + + [Test] + public async Task ShutdownAsyncWithAofCommit() + { + // Arrange - Create server with AOF enabled + server?.Dispose(); + TestUtils.DeleteDirectory(TestUtils.MethodTestDir); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, enableAOF: true); + server.Start(); + + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + // Write some data + for (int i = 0; i < 100; i++) + { + db.StringSet($"aof-key-{i}", $"value-{i}"); + } + + // Act - Shutdown should commit AOF without errors + Assert.DoesNotThrowAsync(async () => + { + await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(5)).ConfigureAwait(false); + }); + } + } +} \ No newline at end of file diff --git a/test/Garnet.test/RespAdminCommandsTests.cs b/test/Garnet.test/RespAdminCommandsTests.cs index 3d1f801b2b9..bb8b68a8743 100644 --- a/test/Garnet.test/RespAdminCommandsTests.cs +++ b/test/Garnet.test/RespAdminCommandsTests.cs @@ -662,5 +662,126 @@ public void ConfigGetWrongNumberOfArguments() ClassicAssert.AreEqual(expectedMessage, ex.Message); } #endregion + + #region GracefulShutdownTests + [Test] + public async Task ShutdownAsyncStopsAcceptingNewConnections() + { + // Arrange - write data then close connection + using (var redis1 = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db1 = redis1.GetDatabase(0); + db1.StringSet("test", "value"); + } + + // Act - Initiate shutdown + await server.ShutdownAsync(TimeSpan.FromSeconds(5)); + + // Assert - New connections should fail + Assert.Throws(() => + { + using var redis2 = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + redis2.GetDatabase(0).Ping(); + }); + } + + [Test] + public async Task ShutdownAsyncWaitsForActiveConnections() + { + // Arrange - keep connection open to test timeout behavior + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + db.StringSet("key1", "value1"); + + // Act - Start shutdown with short timeout; active connection will force timeout + var sw = System.Diagnostics.Stopwatch.StartNew(); + await server.ShutdownAsync(TimeSpan.FromMilliseconds(500)); + sw.Stop(); + + // Assert - Should wait approximately the timeout duration before proceeding + ClassicAssert.GreaterOrEqual(sw.ElapsedMilliseconds, 400, + "Shutdown should wait for the timeout duration when connections are active"); + ClassicAssert.Less(sw.ElapsedMilliseconds, 5_000, + "Shutdown should not hang beyond a reasonable bound"); + } + + [Test] + public async Task ShutdownAsyncCommitsAOF() + { + // Arrange + server.Dispose(); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) + { + var db = redis.GetDatabase(0); + db.StringSet("aofKey", "aofValue"); + } + + // Act - Shutdown which should commit AOF + await server.ShutdownAsync(TimeSpan.FromSeconds(5)); + server.Dispose(false); + + // Assert - Recover and verify data persisted + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, enableAOF: true, tryRecover: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + var recoveredValue = db.StringGet("aofKey"); + ClassicAssert.AreEqual("aofValue", recoveredValue.ToString()); + } + } + + [Test] + public async Task ShutdownAsyncTakesCheckpointWhenStorageTierEnabled() + { + // Arrange + server.Dispose(); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) + { + var db = redis.GetDatabase(0); + db.StringSet("checkpointKey", "checkpointValue"); + } + + // Act - Shutdown which should take checkpoint + await server.ShutdownAsync(TimeSpan.FromSeconds(5)); + server.Dispose(false); + + // Assert - Recover from checkpoint + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + var recoveredValue = db.StringGet("checkpointKey"); + ClassicAssert.AreEqual("checkpointValue", recoveredValue.ToString()); + } + } + + [Test] + public async Task ShutdownAsyncRespectsTimeout() + { + // Arrange - keep connection open to force timeout path + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + db.StringSet("key", "value"); + + // Act - Shutdown with very short timeout + var sw = System.Diagnostics.Stopwatch.StartNew(); + await server.ShutdownAsync(TimeSpan.FromMilliseconds(100)); + sw.Stop(); + + // Assert - Should complete within reasonable time + ClassicAssert.Less(sw.ElapsedMilliseconds, 5_000, + $"Shutdown should complete within reasonable time. Actual: {sw.ElapsedMilliseconds}ms"); + } + #endregion } } \ No newline at end of file diff --git a/test/Garnet.test/ShutdownDataConsistencyTests.cs b/test/Garnet.test/ShutdownDataConsistencyTests.cs new file mode 100644 index 00000000000..9d42853949a --- /dev/null +++ b/test/Garnet.test/ShutdownDataConsistencyTests.cs @@ -0,0 +1,419 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using Allure.NUnit; +using NUnit.Framework; +using NUnit.Framework.Legacy; +using StackExchange.Redis; + +namespace Garnet.test +{ + /// + /// Tests to investigate data consistency under different shutdown finalization sequences: + /// 1. Checkpoint first, then AOF commit + /// 2. AOF commit first, then Checkpoint (current production order) + /// 3. AOF commit only + /// 4. Checkpoint only + /// + /// Each test writes data to main store (string keys) and object store (sorted sets), + /// performs the finalization sequence, disposes without cleanup, recovers, and verifies data. + /// + [AllureNUnit] + [TestFixture] + public class ShutdownDataConsistencyTests : AllureTestBase + { + private GarnetServer server; + + private const int KeyCount = 50; + private const string MainStoreKeyPrefix = "shutdowntest:key:"; + private const string MainStoreValuePrefix = "shutdowntest:value:"; + private const string ObjectStoreKeyPrefix = "shutdowntest:zset:"; + + [SetUp] + public void Setup() + { + TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); + } + + [TearDown] + public void TearDown() + { + server?.Dispose(); + TestUtils.DeleteDirectory(TestUtils.MethodTestDir); + } + + /// + /// Creates a server with both AOF and storage tier enabled (low memory forces spill to disk). + /// + private GarnetServer CreateServerWithAofAndStorage(bool tryRecover = false) + { + return TestUtils.CreateGarnetServer( + TestUtils.MethodTestDir, + enableAOF: true, + lowMemory: true, + tryRecover: tryRecover); + } + + /// + /// Populates main store with string key-value pairs and object store with sorted sets. + /// + private void PopulateData() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true)); + var db = redis.GetDatabase(0); + + // Main store: string keys + for (var i = 0; i < KeyCount; i++) + { + db.StringSet($"{MainStoreKeyPrefix}{i}", $"{MainStoreValuePrefix}{i}"); + } + + // Object store: sorted sets + for (var i = 0; i < KeyCount; i++) + { + var entries = new SortedSetEntry[] + { + new($"member_a_{i}", i * 10), + new($"member_b_{i}", i * 10 + 1), + new($"member_c_{i}", i * 10 + 2), + }; + db.SortedSetAdd($"{ObjectStoreKeyPrefix}{i}", entries); + } + } + + /// + /// Verifies all main store and object store data is recovered correctly. + /// Returns (mainStoreRecovered, objectStoreRecovered) counts. + /// + private (int mainStoreRecovered, int objectStoreRecovered) VerifyRecoveredData() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var mainStoreRecovered = 0; + for (var i = 0; i < KeyCount; i++) + { + var value = db.StringGet($"{MainStoreKeyPrefix}{i}"); + if (value.HasValue) + { + ClassicAssert.AreEqual($"{MainStoreValuePrefix}{i}", value.ToString(), + $"Main store key {MainStoreKeyPrefix}{i} has wrong value after recovery"); + mainStoreRecovered++; + } + } + + var objectStoreRecovered = 0; + for (var i = 0; i < KeyCount; i++) + { + var members = db.SortedSetRangeByScoreWithScores($"{ObjectStoreKeyPrefix}{i}"); + if (members.Length > 0) + { + ClassicAssert.AreEqual(3, members.Length, + $"Object store key {ObjectStoreKeyPrefix}{i} should have 3 members"); + ClassicAssert.AreEqual($"member_a_{i}", members[0].Element.ToString()); + ClassicAssert.AreEqual(i * 10, members[0].Score); + ClassicAssert.AreEqual($"member_b_{i}", members[1].Element.ToString()); + ClassicAssert.AreEqual(i * 10 + 1, members[1].Score); + ClassicAssert.AreEqual($"member_c_{i}", members[2].Element.ToString()); + ClassicAssert.AreEqual(i * 10 + 2, members[2].Score); + objectStoreRecovered++; + } + } + + return (mainStoreRecovered, objectStoreRecovered); + } + + /// + /// Scenario 1: Checkpoint → AOF commit sequence. + /// Takes checkpoint first, then commits AOF. + /// + [Test] + public void CheckpointThenAofCommit_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + PopulateData(); + + // Sequence: Checkpoint first, then AOF commit + server.Store.TakeCheckpoint(background: false); + server.Store.CommitAOF(spinWait: true); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + var (mainRecovered, objRecovered) = VerifyRecoveredData(); + + TestContext.Progress.WriteLine( + $"[Checkpoint→AOF] Main store: {mainRecovered}/{KeyCount}, Object store: {objRecovered}/{KeyCount}"); + + ClassicAssert.AreEqual(KeyCount, mainRecovered, + "Checkpoint→AOF: Not all main store keys recovered"); + ClassicAssert.AreEqual(KeyCount, objRecovered, + "Checkpoint→AOF: Not all object store keys recovered"); + } + + /// + /// Scenario 2: AOF commit → Checkpoint sequence (current production order). + /// Commits AOF first, then takes checkpoint. + /// + [Test] + public void AofCommitThenCheckpoint_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + PopulateData(); + + // Sequence: AOF commit first, then Checkpoint (matches current FinalizeDataAsync order) + server.Store.CommitAOF(spinWait: true); + server.Store.TakeCheckpoint(background: false); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + var (mainRecovered, objRecovered) = VerifyRecoveredData(); + + TestContext.Progress.WriteLine( + $"[AOF→Checkpoint] Main store: {mainRecovered}/{KeyCount}, Object store: {objRecovered}/{KeyCount}"); + + ClassicAssert.AreEqual(KeyCount, mainRecovered, + "AOF→Checkpoint: Not all main store keys recovered"); + ClassicAssert.AreEqual(KeyCount, objRecovered, + "AOF→Checkpoint: Not all object store keys recovered"); + } + + /// + /// Scenario 3: AOF commit only (no checkpoint). + /// Only commits AOF before shutdown. + /// + [Test] + public void AofCommitOnly_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + PopulateData(); + + // Sequence: AOF commit only + server.Store.CommitAOF(spinWait: true); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + var (mainRecovered, objRecovered) = VerifyRecoveredData(); + + TestContext.Progress.WriteLine( + $"[AOF Only] Main store: {mainRecovered}/{KeyCount}, Object store: {objRecovered}/{KeyCount}"); + + ClassicAssert.AreEqual(KeyCount, mainRecovered, + "AOF Only: Not all main store keys recovered"); + ClassicAssert.AreEqual(KeyCount, objRecovered, + "AOF Only: Not all object store keys recovered"); + } + + /// + /// Scenario 4: Checkpoint only (no AOF commit). + /// Only takes checkpoint before shutdown. + /// + [Test] + public void CheckpointOnly_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + PopulateData(); + + // Sequence: Checkpoint only (no AOF commit) + server.Store.TakeCheckpoint(background: false); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + var (mainRecovered, objRecovered) = VerifyRecoveredData(); + + TestContext.Progress.WriteLine( + $"[Checkpoint Only] Main store: {mainRecovered}/{KeyCount}, Object store: {objRecovered}/{KeyCount}"); + + // Note: With checkpoint only (no AOF commit), data written after the last + // checkpoint but before the AOF commit may be lost. This test documents + // the actual behavior for investigation purposes. + ClassicAssert.AreEqual(KeyCount, mainRecovered, + "Checkpoint Only: Not all main store keys recovered"); + ClassicAssert.AreEqual(KeyCount, objRecovered, + "Checkpoint Only: Not all object store keys recovered"); + } + + /// + /// Scenario 5: No finalization at all (baseline - result is configuration-dependent). + /// Neither AOF commit nor checkpoint before shutdown. + /// Recovery results may vary if data was already persisted by background AOF or storage tier spill. + /// + [Test] + public void NoFinalization_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + PopulateData(); + + // No explicit finalization before shutdown + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + var (mainRecovered, objRecovered) = VerifyRecoveredData(); + + TestContext.Progress.WriteLine( + $"[No Finalization] Main store: {mainRecovered}/{KeyCount}, Object store: {objRecovered}/{KeyCount}"); + + // Baseline observation only: recovery depends on prior persistence behavior. + TestContext.Progress.WriteLine( + $"[No Finalization] Data loss: main store={KeyCount - mainRecovered}, object store={KeyCount - objRecovered}"); + } + + /// + /// Scenario 6: Interleaved writes with checkpoint then additional writes with AOF commit. + /// Simulates the case where new writes happen between checkpoint and AOF commit. + /// + [Test] + public void CheckpointThenMoreWritesThenAofCommit_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + // Phase 1: Initial data + PopulateData(); + + // Take checkpoint + server.Store.TakeCheckpoint(background: false); + + // Phase 2: Write additional data AFTER checkpoint + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + for (var i = KeyCount; i < KeyCount * 2; i++) + { + db.StringSet($"{MainStoreKeyPrefix}{i}", $"{MainStoreValuePrefix}{i}"); + } + } + + // Now commit AOF (should capture phase 2 writes) + server.Store.CommitAOF(spinWait: true); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + var phase1Recovered = 0; + for (var i = 0; i < KeyCount; i++) + { + var value = db.StringGet($"{MainStoreKeyPrefix}{i}"); + if (value.HasValue && value.ToString() == $"{MainStoreValuePrefix}{i}") + phase1Recovered++; + } + + var phase2Recovered = 0; + for (var i = KeyCount; i < KeyCount * 2; i++) + { + var value = db.StringGet($"{MainStoreKeyPrefix}{i}"); + if (value.HasValue && value.ToString() == $"{MainStoreValuePrefix}{i}") + phase2Recovered++; + } + + TestContext.Progress.WriteLine( + $"[Checkpoint→Writes→AOF] Phase1: {phase1Recovered}/{KeyCount}, Phase2 (post-checkpoint): {phase2Recovered}/{KeyCount}"); + + ClassicAssert.AreEqual(KeyCount, phase1Recovered, + "Checkpoint→Writes→AOF: Not all phase 1 keys recovered"); + ClassicAssert.AreEqual(KeyCount, phase2Recovered, + "Checkpoint→Writes→AOF: Not all phase 2 (post-checkpoint) keys recovered"); + } + } + + /// + /// Scenario 7: AOF commit then additional writes then checkpoint. + /// Simulates the case where new writes happen between AOF commit and checkpoint. + /// + [Test] + public void AofCommitThenMoreWritesThenCheckpoint_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + // Phase 1: Initial data + PopulateData(); + + // Commit AOF + server.Store.CommitAOF(spinWait: true); + + // Phase 2: Write additional data AFTER AOF commit + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + for (var i = KeyCount; i < KeyCount * 2; i++) + { + db.StringSet($"{MainStoreKeyPrefix}{i}", $"{MainStoreValuePrefix}{i}"); + } + } + + // Now take checkpoint (should capture phase 2 writes) + server.Store.TakeCheckpoint(background: false); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + var phase1Recovered = 0; + for (var i = 0; i < KeyCount; i++) + { + var value = db.StringGet($"{MainStoreKeyPrefix}{i}"); + if (value.HasValue && value.ToString() == $"{MainStoreValuePrefix}{i}") + phase1Recovered++; + } + + var phase2Recovered = 0; + for (var i = KeyCount; i < KeyCount * 2; i++) + { + var value = db.StringGet($"{MainStoreKeyPrefix}{i}"); + if (value.HasValue && value.ToString() == $"{MainStoreValuePrefix}{i}") + phase2Recovered++; + } + + TestContext.Progress.WriteLine( + $"[AOF→Writes→Checkpoint] Phase1: {phase1Recovered}/{KeyCount}, Phase2 (post-AOF): {phase2Recovered}/{KeyCount}"); + + ClassicAssert.AreEqual(KeyCount, phase1Recovered, + "AOF→Writes→Checkpoint: Not all phase 1 keys recovered"); + ClassicAssert.AreEqual(KeyCount, phase2Recovered, + "AOF→Writes→Checkpoint: Not all phase 2 (post-AOF) keys recovered"); + } + } + } +} \ No newline at end of file