Skip to content

Commit

Permalink
Merge branch 'main' into yrajas/objectstorerecovery
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed May 7, 2024
2 parents 631eef7 + a6218f9 commit 66a4cb5
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 30 deletions.
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ internal bool WaitForConfigTransition()
foreach (var s in sessions)
{
var entryEpoch = s.LocalCurrentEpoch;
if (entryEpoch != 0 && entryEpoch >= currentEpoch)
if (entryEpoch != 0 && entryEpoch < currentEpoch)
goto retry;
}
break;
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Replication/CheckpointEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public CheckpointEntry()
}

public long GetMinAofCoveredAddress()
=> Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress);
=> Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64);

/// <summary>
/// Indicate addition of new reader by trying to increment reader counter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void Dispose()
public async Task<bool> SendCheckpoint()
{
errorMsg = default;
var retryCount = 0;
var storeCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main);
var objectStoreCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object);
var current = clusterProvider.clusterManager.CurrentConfig;
Expand Down Expand Up @@ -96,6 +97,8 @@ public async Task<bool> SendCheckpoint()
{
localEntry.RemoveReader();
_ = Thread.Yield();
if (retryCount++ > 10)
throw new GarnetException("Attaching replica maximum retry count reached!");
goto retry;
}
}
Expand All @@ -110,6 +113,8 @@ public async Task<bool> SendCheckpoint()
{
localEntry.RemoveReader();
_ = Thread.Yield();
if (retryCount++ > 10)
throw new GarnetException("Attaching replica maximum retry count reached!");
goto retry;
}
}
Expand Down Expand Up @@ -187,7 +192,7 @@ public async Task<bool> SendCheckpoint()
var beginAddress = RecoveredReplicationOffset;
if (!recoverFromRemote)
{
//If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset
// If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset
if (replicaAofBeginAddress > ReplicationManager.kFirstValidAofAddress && replicaAofBeginAddress > RecoveredReplicationOffset)
{
logger?.LogInformation(
Expand Down
19 changes: 14 additions & 5 deletions libs/server/Resp/ArrayCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -668,15 +668,25 @@ private bool NetworkSELECT(byte* ptr)

readHead = (int)(ptr - recvBufferPtr);

if (string.Equals(result, "0"))
if (storeWrapper.serverOptions.EnableCluster)
{
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
// Cluster mode does not allow DBID
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SELECT_CLUSTER_MODE, ref dcurr, dend))
SendAndReset();
}
else
{
while (!RespWriteUtils.WriteError("ERR invalid database index."u8, ref dcurr, dend))
SendAndReset();

if (string.Equals(result, "0"))
{
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
}
else
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SELECT_INVALID_INDEX, ref dcurr, dend))
SendAndReset();
}
}
return true;
}
Expand Down Expand Up @@ -728,7 +738,6 @@ private bool NetworkKEYS<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storag
return true;
}


private bool NetworkSCAN<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
Expand Down
3 changes: 3 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public static ReadOnlySpan<byte> GetConfig(ReadOnlySpan<byte> key)
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER => "ERR value is not an integer or out of range."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_UKNOWN_SUBCOMMAND => "ERR Unknown subcommand. Try LATENCY HELP."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_INDEX_OUT_RANGE => "ERR index out of range"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SELECT_INVALID_INDEX => "ERR invalid database index."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SELECT_CLUSTER_MODE => "ERR SELECT is not allowed in cluster mode"u8;

/// <summary>
/// Response string templates
/// </summary>
Expand Down
19 changes: 10 additions & 9 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ internal void Recover()
{
storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(storeIndexToken, storeHlogToken);
if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(objectStoreIndexToken, objectStoreHlogToken);
lastSaveTime = DateTimeOffset.UtcNow;
if (storeVersion > 0 || objectStoreVersion > 0)
lastSaveTime = DateTimeOffset.UtcNow;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -269,8 +270,8 @@ public long ReplayAOF(long untilAddress = -1)
long replicationOffset = 0;
try
{
//When replaying AOF we do not want to write record again to AOF.
//So initialize local AofProcessor with recordToAof: false.
// When replaying AOF we do not want to write record again to AOF.
// So initialize local AofProcessor with recordToAof: false.
var aofProcessor = new AofProcessor(this, recordToAof: false, logger);
aofProcessor.Recover(untilAddress);
aofProcessor.Dispose();
Expand Down Expand Up @@ -580,22 +581,22 @@ void CompleteCheckpoint()
/// <summary>
/// Take a checkpoint if no checkpoint was taken after the provided time offset
/// </summary>
/// <param name="afterTime"></param>
/// <param name="entryTime"></param>
/// <returns></returns>
public async Task TakeOnDemandCheckpoint(DateTimeOffset afterTime)
public async Task TakeOnDemandCheckpoint(DateTimeOffset entryTime)
{
//Take lock to ensure not other task will be taking a checkpoint
// Take lock to ensure no other task will be taking a checkpoint
while (!StartCheckpoint())
await Task.Yield();

//If an external task has taken a checkpoint after the provided afterTime return
if (this.lastSaveTime > afterTime)
// If an external task has taken a checkpoint beyond the provided entryTime return
if (this.lastSaveTime > entryTime)
{
CompleteCheckpoint();
return;
}

//If no newer checkpoint was taken compared to the provided afterTime take a checkpoint
// Necessary to take a checkpoint because the latest checkpoint is before entryTime
await CheckpointTask(StoreType.All, logger: logger);
}

Expand Down
33 changes: 24 additions & 9 deletions test/Garnet.test.cluster/ClusterReplicationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,18 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d
context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, 0);
else
context.PopulatePrimaryRMW(ref context.kvPairs, keyLength, kvpairCount, 0, addCount);

var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger);
var replicaLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger);
context.clusterTestUtils.Checkpoint(0, logger: context.logger);

// Populate Primary
context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, 0);
context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, 1);

context.clusterTestUtils.WaitForReplicaAofSync(0, 1, context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(0, context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(1, context.logger);
context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(1, replicaLastSaveTime, logger: context.logger);

// Shutdown secondary
context.nodes[1].Store.CommitAOF(true);
Expand Down Expand Up @@ -599,9 +602,11 @@ public void ClusterReplicationSimpleFailover([Values] bool performRMW, [Values]

if (checkpoint)
{
var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger);
var replicaLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger);
context.clusterTestUtils.Checkpoint(0);
context.clusterTestUtils.WaitFirstCheckpoint(0, logger: context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(1, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(1, replicaLastSaveTime, logger: context.logger);
}

#region InitiateFailover
Expand Down Expand Up @@ -667,8 +672,9 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo

if (takePrimaryCheckpoint)
{
var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger);
context.clusterTestUtils.Checkpoint(0, logger: context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(0, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger);
}

// Wait for replication offsets to synchronize
Expand All @@ -692,8 +698,9 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo

if (takeNewPrimaryCheckpoint)
{
var newPrimaryLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger);
context.clusterTestUtils.Checkpoint(1, logger: context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(1, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(1, newPrimaryLastSaveTime, logger: context.logger);
}
context.clusterTestUtils.WaitForReplicaAofSync(1, 2, context.logger);

Expand Down Expand Up @@ -912,11 +919,19 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp
}
else context.PopulatePrimaryWithObjects(ref context.kvPairsObj, keyLength, kvpairCount, primaryIndex: oldPrimaryIndex, set: set);

if (ckptBeforeDivergence) context.clusterTestUtils.Checkpoint(oldPrimaryIndex, logger: context.logger);
if (ckptBeforeDivergence)
{
var oldPrimaryLastSaveTime = context.clusterTestUtils.LastSave(oldPrimaryIndex, logger: context.logger);
var newPrimaryLastSaveTime = context.clusterTestUtils.LastSave(newPrimaryIndex, logger: context.logger);
var replicaLastSaveTime = context.clusterTestUtils.LastSave(replicaIndex, logger: context.logger);
context.clusterTestUtils.Checkpoint(oldPrimaryIndex, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(oldPrimaryIndex, oldPrimaryLastSaveTime, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(newPrimaryIndex, newPrimaryLastSaveTime, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(replicaIndex, replicaLastSaveTime, logger: context.logger);
}

context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, newPrimaryIndex, context.logger);
context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, replicaIndex, context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(newPrimaryIndex, logger: context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(replicaIndex, logger: context.logger);

// Make this replica of no-one
_ = context.clusterTestUtils.ReplicaOf(1, logger: context.logger);
Expand Down
31 changes: 27 additions & 4 deletions test/Garnet.test.cluster/ClusterTestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2647,16 +2647,39 @@ public void Checkpoint(IPEndPoint endPoint, ILogger logger = null)
}
}

public void WaitFirstCheckpoint(int nodeIndex, ILogger logger = null)
=> WaitCheckpoint((IPEndPoint)endpoints[nodeIndex], logger: logger);
public DateTime LastSave(int nodeIndex, ILogger logger = null)
=> LastSave((IPEndPoint)endpoints[nodeIndex], logger: logger);

public void WaitCheckpoint(IPEndPoint endPoint, ILogger logger = null)
public DateTime LastSave(IPEndPoint endPoint, ILogger logger = null)
{
try
{
var server = redis.GetServer(endPoint);
while (server.LastSave().Ticks == DateTimeOffset.FromUnixTimeSeconds(0).Ticks)
return server.LastSave();
}
catch (Exception ex)
{
logger?.LogError(ex, "An error has occurred; WaitCheckpoint");
Assert.Fail();
}
return default;
}

public void WaitCheckpoint(int nodeIndex, DateTime time, ILogger logger = null)
=> WaitCheckpoint((IPEndPoint)endpoints[nodeIndex], time: time, logger: logger);

public void WaitCheckpoint(IPEndPoint endPoint, DateTime time, ILogger logger = null)
{
try
{
var server = redis.GetServer(endPoint);
while (true)
{
var lastSaveTime = server.LastSave();
if (lastSaveTime >= time)
break;
BackOff();
}
}
catch (Exception ex)
{
Expand Down

0 comments on commit 66a4cb5

Please sign in to comment.