Skip to content

Commit

Permalink
Inline RAFT message processing.
Browse files Browse the repository at this point in the history
Process RAFT messages directly inside the role handlers, thereby removing
the need to change role, and having to send the message back for later
processing by that role.

The tests have been restructured to reflect the requirement for
some messages to be handled in mostly the same way, regardless of
the current role.
  • Loading branch information
apcj authored and Mark Needham committed Jan 27, 2016
1 parent c2a91ff commit a0ccdf1
Show file tree
Hide file tree
Showing 20 changed files with 1,035 additions and 780 deletions.
Expand Up @@ -245,7 +245,7 @@ public synchronized void handle( Serializable incomingMessage )
try try
{ {
handlingMessage = true; handlingMessage = true;
Outcome<MEMBER> outcome = currentRole.role.handle( (RaftMessages.Message<MEMBER>) incomingMessage, state, log ); Outcome<MEMBER> outcome = currentRole.handler.handle( (RaftMessages.Message<MEMBER>) incomingMessage, state, log );


handleOutcome( outcome ); handleOutcome( outcome );
currentRole = outcome.getNewRole(); currentRole = outcome.getNewRole();
Expand Down
Expand Up @@ -78,6 +78,12 @@ public Message<MEMBER> message()
{ {
return message; return message;
} }

@Override
public String toString()
{
return format( "Directed{to=%s, message=%s}", to, message );
}
} }


interface Vote interface Vote
Expand Down Expand Up @@ -355,7 +361,7 @@ public int hashCode()
@Override @Override
public String toString() public String toString()
{ {
return String.format( "AppendEntries.Response from %s {term=%d, success=%s, matchIndex=%d, appendIndex=%d}", return format( "AppendEntries.Response from %s {term=%d, success=%s, matchIndex=%d, appendIndex=%d}",
from, term, success, matchIndex, appendIndex ); from, term, success, matchIndex, appendIndex );
} }
} }
Expand Down
@@ -0,0 +1,113 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.roles;

import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.coreedge.raft.outcome.AppendLogEntry;
import org.neo4j.coreedge.raft.outcome.BatchAppendLogEntries;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.outcome.ShipCommand;
import org.neo4j.coreedge.raft.outcome.TruncateLogCommand;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ReadableRaftState;

public class Appending
{
public static <MEMBER> void handleAppendEntriesRequest(
ReadableRaftState<MEMBER> state, Outcome<MEMBER> outcome, RaftMessages.AppendEntries.Request<MEMBER> request )
throws RaftStorageException
{
if ( request.leaderTerm() < state.term() )
{
RaftMessages.AppendEntries.Response<MEMBER> appendResponse = new RaftMessages.AppendEntries.Response<>(
state.myself(), state.term(), false, -1, state.entryLog().appendIndex() );

outcome.addOutgoingMessage( new RaftMessages.Directed<>( request.from(), appendResponse ) );
return;
}

outcome.renewElectionTimeout();
outcome.setNextTerm( request.leaderTerm() );
outcome.setLeader( request.from() );
outcome.setLeaderCommit( request.leaderCommit() );

if ( !Follower.logHistoryMatches( state, request.prevLogIndex(), request.prevLogTerm() ) )
{
assert request.prevLogIndex() > -1 && request.prevLogTerm() > -1;
RaftMessages.AppendEntries.Response<MEMBER> appendResponse = new RaftMessages.AppendEntries.Response<>(
state.myself(), request.leaderTerm(), false, -1, state.entryLog().appendIndex() );

outcome.addOutgoingMessage( new RaftMessages.Directed<>( request.from(), appendResponse ) );
return;
}

long baseIndex = request.prevLogIndex() + 1;
int offset;

/* Find possible truncation point. */
for ( offset = 0; offset < request.entries().length; offset++ )
{
long logTerm = state.entryLog().readEntryTerm( baseIndex + offset );

if( baseIndex + offset > state.entryLog().appendIndex() )
{
/* entry doesn't exist */
break;
}
else if ( logTerm != request.entries()[offset].term() )
{
outcome.addLogCommand( new TruncateLogCommand( baseIndex + offset ) );
break;
}
}

if( offset < request.entries().length )
{
outcome.addLogCommand( new BatchAppendLogEntries( baseIndex, offset, request.entries() ) );
}

Follower.commitToLogOnUpdate( state, request.prevLogIndex() + request.entries().length, request.leaderCommit
(), outcome );

long endMatchIndex = request.prevLogIndex() + request.entries().length; // this is the index of the last incoming entry
if ( endMatchIndex >= 0 )
{
RaftMessages.AppendEntries.Response<MEMBER> appendResponse = new RaftMessages.AppendEntries.Response<>( state.myself(), request.leaderTerm(), true, endMatchIndex, endMatchIndex );
outcome.addOutgoingMessage( new RaftMessages.Directed<>( request.from(), appendResponse ) );
}
}

public static <MEMBER> void appendNewEntry( ReadableRaftState<MEMBER> ctx, Outcome<MEMBER> outcome, ReplicatedContent
content ) throws RaftStorageException
{
long prevLogIndex = ctx.entryLog().appendIndex();
long prevLogTerm = prevLogIndex == -1 ? -1 :
prevLogIndex > ctx.lastLogIndexBeforeWeBecameLeader() ?
ctx.term() :
ctx.entryLog().readLogEntry( prevLogIndex ).term();

RaftLogEntry newLogEntry = new RaftLogEntry( ctx.term(), content );

outcome.addShipCommand( new ShipCommand.NewEntry( prevLogIndex, prevLogTerm, newLogEntry ) );
outcome.addLogCommand( new AppendLogEntry( prevLogIndex + 1, newLogEntry ) );
}
}
Expand Up @@ -52,7 +52,7 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.Message<MEMBER> message,
} }


outcome.setNextRole( FOLLOWER ); outcome.setNextRole( FOLLOWER );
outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), message ) ); Heart.beat( ctx, outcome, (RaftMessages.Heartbeat<MEMBER>) message );
break; break;
} }


Expand All @@ -71,7 +71,7 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.Message<MEMBER> message,
} }


outcome.setNextRole( FOLLOWER ); outcome.setNextRole( FOLLOWER );
outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), req ) ); Appending.handleAppendEntriesRequest( ctx, outcome, req );
break; break;
} }


Expand Down Expand Up @@ -101,7 +101,7 @@ else if ( res.term() < ctx.term() || !res.voteGranted() )
ctx.term(), ctx.myself(), outcome.getVotesForMe() ); ctx.term(), ctx.myself(), outcome.getVotesForMe() );


outcome.setLeader( ctx.myself() ); outcome.setLeader( ctx.myself() );
Leader.appendNewEntry( ctx, outcome, new NewLeaderBarrier() ); Appending.appendNewEntry( ctx, outcome, new NewLeaderBarrier() );


outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() ); outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() );
outcome.setNextRole( LEADER ); outcome.setNextRole( LEADER );
Expand All @@ -115,11 +115,9 @@ else if ( res.term() < ctx.term() || !res.voteGranted() )


if ( req.term() > ctx.term() ) if ( req.term() > ctx.term() )
{ {
outcome.setNextTerm( req.term() );
outcome.getVotesForMe().clear(); outcome.getVotesForMe().clear();

outcome.setNextRole( FOLLOWER ); outcome.setNextRole( FOLLOWER );
outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), req ) ); Voting.handleVoteRequest( ctx, outcome, req );
break; break;
} }


Expand Down
Expand Up @@ -24,24 +24,21 @@
import org.neo4j.coreedge.raft.RaftMessageHandler; import org.neo4j.coreedge.raft.RaftMessageHandler;
import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.RaftMessages.AppendEntries; import org.neo4j.coreedge.raft.RaftMessages.AppendEntries;
import org.neo4j.coreedge.raft.RaftMessages.AppendEntries.Response;
import org.neo4j.coreedge.raft.RaftMessages.Heartbeat; import org.neo4j.coreedge.raft.RaftMessages.Heartbeat;
import org.neo4j.coreedge.raft.log.RaftStorageException; import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.coreedge.raft.outcome.BatchAppendLogEntries;
import org.neo4j.coreedge.raft.outcome.CommitCommand; import org.neo4j.coreedge.raft.outcome.CommitCommand;
import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.outcome.TruncateLogCommand;
import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


import static java.lang.Long.min; import static java.lang.Long.min;
import static org.neo4j.coreedge.raft.Ballot.shouldVoteFor;
import static org.neo4j.coreedge.raft.roles.Role.CANDIDATE; import static org.neo4j.coreedge.raft.roles.Role.CANDIDATE;
import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER;


public class Follower implements RaftMessageHandler public class Follower implements RaftMessageHandler
{ {
private static <MEMBER> boolean logHistoryMatches( ReadableRaftState<MEMBER> ctx, long prevLogIndex, long prevLogTerm ) public static <MEMBER> boolean logHistoryMatches( ReadableRaftState<MEMBER> ctx, long prevLogIndex, long prevLogTerm )
throws RaftStorageException throws RaftStorageException
{ {
// NOTE: A previous log index of -1 means no history, // NOTE: A previous log index of -1 means no history,
Expand All @@ -52,7 +49,7 @@ private static <MEMBER> boolean logHistoryMatches( ReadableRaftState<MEMBER> ctx
return prevLogIndex == -1 || ctx.entryLog().readEntryTerm( prevLogIndex ) == prevLogTerm; return prevLogIndex == -1 || ctx.entryLog().readEntryTerm( prevLogIndex ) == prevLogTerm;
} }


private static <MEMBER> boolean commitToLogOnUpdate( ReadableRaftState<MEMBER> ctx, long indexOfLastNewEntry, public static <MEMBER> boolean commitToLogOnUpdate( ReadableRaftState<MEMBER> ctx, long indexOfLastNewEntry,
long leaderCommit, Outcome<MEMBER> outcome ) long leaderCommit, Outcome<MEMBER> outcome )
{ {
long newCommitIndex = min( leaderCommit, indexOfLastNewEntry ); long newCommitIndex = min( leaderCommit, indexOfLastNewEntry );
Expand All @@ -75,115 +72,19 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.Message<MEMBER> message, Re
{ {
case HEARTBEAT: case HEARTBEAT:
{ {
Heartbeat<MEMBER> req = (Heartbeat<MEMBER>) message; Heart.beat( ctx, outcome, (Heartbeat<MEMBER>) message );

if ( req.leaderTerm() < ctx.term() )
{
break;
}

outcome.renewElectionTimeout();
outcome.setNextTerm( req.leaderTerm() );
outcome.setLeader( req.from() );
outcome.setLeaderCommit( req.commitIndex() );

if ( !logHistoryMatches( ctx, req.commitIndex(), req.commitIndexTerm() ) )
{
break;
}

commitToLogOnUpdate( ctx, req.commitIndex(), req.commitIndex(), outcome );
break; break;
} }


case APPEND_ENTRIES_REQUEST: case APPEND_ENTRIES_REQUEST:
{ {
AppendEntries.Request<MEMBER> req = (AppendEntries.Request<MEMBER>) message; Appending.handleAppendEntriesRequest( ctx, outcome, (AppendEntries.Request<MEMBER>) message );

if ( req.leaderTerm() < ctx.term() )
{
Response<MEMBER> appendResponse = new Response<>(
ctx.myself(), ctx.term(), false, -1, ctx.entryLog().appendIndex() );

outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) );
break;
}

outcome.renewElectionTimeout();
outcome.setNextTerm( req.leaderTerm() );
outcome.setLeader( req.from() );
outcome.setLeaderCommit( req.leaderCommit() );

if ( !logHistoryMatches( ctx, req.prevLogIndex(), req.prevLogTerm() ) )
{
assert req.prevLogIndex() > -1 && req.prevLogTerm() > -1;
Response<MEMBER> appendResponse = new Response<>(
ctx.myself(), req.leaderTerm(), false, -1, ctx.entryLog().appendIndex() );

outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) );
break;
}

long baseIndex = req.prevLogIndex() + 1;
int offset;

/* Find possible truncation point. */
for ( offset = 0; offset < req.entries().length; offset++ )
{
long logTerm = ctx.entryLog().readEntryTerm( baseIndex + offset );

if( baseIndex + offset > ctx.entryLog().appendIndex() )
{
/* entry doesn't exist */
break;
}
else if ( logTerm != req.entries()[offset].term() )
{
outcome.addLogCommand( new TruncateLogCommand( baseIndex + offset ) );
break;
}
}

if( offset < req.entries().length )
{
outcome.addLogCommand( new BatchAppendLogEntries( baseIndex, offset, req.entries() ) );
}

commitToLogOnUpdate( ctx, req.prevLogIndex() + req.entries().length, req.leaderCommit(), outcome );

long endMatchIndex = req.prevLogIndex() + req.entries().length; // this is the index of the last incoming entry
if ( endMatchIndex >= 0 )
{
Response<MEMBER> appendResponse = new Response<>( ctx.myself(), req.leaderTerm(), true, endMatchIndex, endMatchIndex );
outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) );
}
break; break;
} }


case VOTE_REQUEST: case VOTE_REQUEST:
{ {
RaftMessages.Vote.Request<MEMBER> req = (RaftMessages.Vote.Request<MEMBER>) message; Voting.handleVoteRequest( ctx, outcome, (RaftMessages.Vote.Request<MEMBER>) message );

if ( req.term() > ctx.term() )
{
outcome.setNextTerm( req.term() );
outcome.setVotedFor( null );
}

boolean willVoteForCandidate = shouldVoteFor( req.candidate(), outcome.getTerm(), req.term(),
ctx.entryLog().readEntryTerm( ctx.entryLog().appendIndex() ), req.lastLogTerm(),
ctx.entryLog().appendIndex(), req.lastLogIndex(),
outcome.getVotedFor() );

if ( willVoteForCandidate )
{
outcome.setVotedFor( req.from() );
outcome.renewElectionTimeout();
}

outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), new RaftMessages.Vote.Response<>(
ctx.myself(), outcome.getTerm(),
willVoteForCandidate ) ) );
break; break;
} }


Expand Down Expand Up @@ -217,4 +118,5 @@ else if ( logTerm != req.entries()[offset].term() )


return outcome; return outcome;
} }

} }
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.roles;

import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;

public class Heart
{
public static <MEMBER> void beat( ReadableRaftState<MEMBER> state, Outcome<MEMBER> outcome, RaftMessages.Heartbeat<MEMBER> request ) throws RaftStorageException
{
if ( request.leaderTerm() < state.term() )
{
return;
}

outcome.renewElectionTimeout();
outcome.setNextTerm( request.leaderTerm() );
outcome.setLeader( request.from() );
outcome.setLeaderCommit( request.commitIndex() );

if ( !Follower.logHistoryMatches( state, request.commitIndex(), request.commitIndexTerm() ) )
{
return;
}

Follower.commitToLogOnUpdate( state, request.commitIndex(), request.commitIndex(), outcome );
}
}

0 comments on commit a0ccdf1

Please sign in to comment.