Skip to content

Commit

Permalink
RavenDB-5917 Improve the case when append entries failed to do mismat…
Browse files Browse the repository at this point in the history
…ch entry term by a lot of entries. Instead of decrementing the last log index by 1 we now improved this by doing a distributed binary search on the log entries to find the highest log entry which both the leader and the follower agreed on.
  • Loading branch information
Fitzchak Yitzchaki authored and ayende committed Dec 28, 2016
1 parent fb251cd commit 79c5a31
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 24 deletions.
53 changes: 38 additions & 15 deletions Rachis/Rachis/Behaviors/AbstractRaftStateBehavior.cs
Expand Up @@ -416,14 +416,16 @@ public virtual void Handle(AppendEntriesResponse resp)

public virtual AppendEntriesResponse Handle(AppendEntriesRequest req)
{
var lastLogIndex = Engine.PersistentState.LastLogEntry().Index;

if (FromOurTopology(req) == false)
{
_log.Info("Got an append entries message outside my cluster topology (id: {0}), ignoring", req.ClusterTopologyId);
return new AppendEntriesResponse
{
Success = false,
CurrentTerm = Engine.PersistentState.CurrentTerm,
LastLogIndex = Engine.PersistentState.LastLogEntry().Index,
LastLogIndex = lastLogIndex,
LeaderId = Engine.CurrentLeader,
Message = "Cannot accept append entries from a node outside my cluster. My topology id is: " + Engine.CurrentTopology.TopologyId,
From = Engine.Name,
Expand All @@ -443,7 +445,7 @@ public virtual AppendEntriesResponse Handle(AppendEntriesRequest req)
{
Success = false,
CurrentTerm = Engine.PersistentState.CurrentTerm,
LastLogIndex = Engine.PersistentState.LastLogEntry().Index,
LastLogIndex = lastLogIndex,
LeaderId = Engine.CurrentLeader,
Message = msg,
From = Engine.Name,
Expand All @@ -465,24 +467,38 @@ public virtual AppendEntriesResponse Handle(AppendEntriesRequest req)
var prevTerm = Engine.PersistentState.TermFor(req.PrevLogIndex) ?? 0;
if (prevTerm != req.PrevLogTerm)
{
var msg = string.Format(
"Rejecting append entries because msg previous term {0} is not the same as the persisted current term {1} at log index {2}",
req.PrevLogTerm, prevTerm, req.PrevLogIndex);
var midpointIndex = req.PrevLogIndex / 2;
var midpointTerm = Engine.PersistentState.TermFor(midpointIndex) ?? 0;

var msg = $"Rejecting append entries because msg previous term {req.PrevLogTerm} is not the same as the persisted current term {prevTerm}" +
$" at log index {req.PrevLogIndex}. Midpoint index {midpointIndex}, midpoint term: {midpointTerm}";
_log.Info(msg);

return new AppendEntriesResponse
{
Success = false,
CurrentTerm = Engine.PersistentState.CurrentTerm,
LastLogIndex = req.PrevLogIndex,
Message = msg,
LeaderId = Engine.CurrentLeader,
MidpointIndex = midpointIndex,
MidpointTerm = midpointTerm,
From = Engine.Name,
ClusterTopologyId = Engine.CurrentTopology.TopologyId,
};
}

LastHeartbeatTime = DateTime.UtcNow;
LastMessageTime = DateTime.UtcNow;

var appendEntriesResponse = new AppendEntriesResponse
{
Success = true,
CurrentTerm = Engine.PersistentState.CurrentTerm,
From = Engine.Name,
ClusterTopologyId = Engine.CurrentTopology.TopologyId,
};

if (req.Entries.Length > 0)
{
if (_log.IsDebugEnabled)
Expand Down Expand Up @@ -511,7 +527,20 @@ public virtual AppendEntriesResponse Handle(AppendEntriesRequest req)
}

if (skip != req.Entries.Length)
{
Engine.PersistentState.AppendToLog(Engine, req.Entries.Skip(skip), req.PrevLogIndex + skip);
}
else
{
// if we skipped the whole thing, this is fine, but let us hint to the leader that we are more
// up to date then it thinks
var lastReceivedIndex = req.Entries[req.Entries.Length - 1].Index;
appendEntriesResponse.MidpointIndex = lastReceivedIndex + (lastLogIndex - lastReceivedIndex) / 2;
appendEntriesResponse.MidpointTerm = Engine.PersistentState.TermFor(appendEntriesResponse.MidpointIndex.Value) ?? 0;

_log.Info($"Got {req.Entries.Length} entires from index {req.Entries[0].Index} with term {req.Entries[0].Term} skipping all. " +
$"Setting midpoint index to {appendEntriesResponse.MidpointIndex} with term {appendEntriesResponse.MidpointTerm}.");
}

var topologyChange = req.Entries.LastOrDefault(x => x.IsTopologyChange == true);

Expand All @@ -536,7 +565,7 @@ public virtual AppendEntriesResponse Handle(AppendEntriesRequest req)
}

var lastIndex = req.Entries.Length == 0 ?
Engine.PersistentState.LastLogEntry().Index :
lastLogIndex :
req.Entries[req.Entries.Length - 1].Index;
try
{
Expand All @@ -546,22 +575,16 @@ public virtual AppendEntriesResponse Handle(AppendEntriesRequest req)
CommitEntries(req.Entries, nextCommitIndex);
}

return new AppendEntriesResponse
{
Success = true,
CurrentTerm = Engine.PersistentState.CurrentTerm,
LastLogIndex = Engine.PersistentState.LastLogEntry().Index,
From = Engine.Name,
ClusterTopologyId = Engine.CurrentTopology.TopologyId,
};
appendEntriesResponse.LastLogIndex = lastLogIndex;
return appendEntriesResponse;
}
catch (Exception e)
{
return new AppendEntriesResponse
{
Success = false,
CurrentTerm = Engine.PersistentState.CurrentTerm,
LastLogIndex = Engine.PersistentState.LastLogEntry().Index,
LastLogIndex = lastLogIndex,
Message = "Failed to apply new entries. Reason: " + e,
From = Engine.Name,
ClusterTopologyId = Engine.CurrentTopology.TopologyId,
Expand Down
53 changes: 44 additions & 9 deletions Rachis/Rachis/Behaviors/LeaderStateBehavior.cs
Expand Up @@ -366,21 +366,17 @@ public override void Handle(AppendEntriesResponse resp)
return;
}


Debug.Assert(resp.From != null);
if (resp.Success == false)
{
_nextIndexes[resp.From] = resp.LastLogIndex - 1;
_matchIndexes[resp.From] = 0;

UpdateNodeIndexes(resp, resp.LastLogIndex - 1, 0);
if (_log.IsDebugEnabled)
_log.Debug("Received Success = false in AppendEntriesResponse from {1}. Now _nextIndexes[{1}] = {0}. Reason: {2}",
_nextIndexes[resp.From], resp.From, resp.Message);
_log.Debug($"Appended entries for {resp.From} failed. Reason: {resp.Message}");
return;
}

Debug.Assert(resp.From != null);
_nextIndexes[resp.From] = resp.LastLogIndex + 1;
_matchIndexes[resp.From] = resp.LastLogIndex;
UpdateNodeIndexes(resp, resp.LastLogIndex + 1, resp.LastLogIndex);

_lastContact[resp.From] = DateTime.UtcNow;
if (_log.IsDebugEnabled)
_log.Debug("Follower ({0}) has LastLogIndex = {1}", resp.From, resp.LastLogIndex);
Expand Down Expand Up @@ -443,6 +439,45 @@ public override void Handle(AppendEntriesResponse resp)
}
}

private void UpdateNodeIndexes(AppendEntriesResponse resp, long defaultNextIndex, long defaultMatchIndex)
{
if (resp.MidpointIndex == null || resp.MidpointTerm == null) // no information, just go back one step
{
_nextIndexes[resp.From] = defaultNextIndex;
_matchIndexes[resp.From] = defaultMatchIndex;

if (_log.IsDebugEnabled)
_log.Debug($"UpdateNodeIndexes: No midpoint index. Using default next index {defaultNextIndex}");
}
else
{
var midpointIndex = resp.MidpointIndex.Value;
var myMidpointTerm = Engine.PersistentState.TermFor(midpointIndex) ?? 0;
if (myMidpointTerm == resp.MidpointTerm.Value)
{
// we know that we are a match on the middle, so let us set the
// next attempt to be half way from the midpoint to the end
_nextIndexes[resp.From] = midpointIndex + Math.Abs(resp.LastLogIndex - midpointIndex) / 2;
_matchIndexes[resp.From] = midpointIndex;

if (_log.IsDebugEnabled)
_log.Debug($"UpdateNodeIndexes: Got match for mindpoint index: {midpointIndex}, term: {myMidpointTerm}.");
}
else
{
// we don't have a match, so we need to go backward yet
_nextIndexes[resp.From] = midpointIndex - Math.Abs(resp.LastLogIndex - midpointIndex) / 2;
_matchIndexes[resp.From] = 0;

if (_log.IsDebugEnabled)
_log.Debug($"UpdateNodeIndexes: Got mismatch for mindpoint index: {midpointIndex}, leader term: {myMidpointTerm}, follower term: {resp.MidpointTerm.Value}");
}
}

if (_log.IsDebugEnabled)
_log.Debug($"UpdateNodeIndexes operation result for {resp.From}: _nextIndexes = {_nextIndexes[resp.From]}, _matchIndexes = {_matchIndexes[resp.From]}.");
}

private void PromoteNodeToVoter(AppendEntriesResponse resp)
{
// if we got a successful append entries response from a promotable node, and it has caught up
Expand Down
5 changes: 5 additions & 0 deletions Rachis/Rachis/Messages/AppendEntriesResponse.cs
Expand Up @@ -6,9 +6,14 @@ public class AppendEntriesResponse : BaseMessage

public long LastLogIndex { get; set; }

public long? MidpointTerm { get; set; }

public long? MidpointIndex { get; set; }

public bool Success { get; set; }

public string Message { get; set; }

public string LeaderId { get; set; }
}
}

0 comments on commit 79c5a31

Please sign in to comment.