Skip to content

Commit

Permalink
Improve naming in raft message decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Jun 11, 2018
1 parent de51097 commit 72d46d1
Showing 1 changed file with 38 additions and 34 deletions.
Expand Up @@ -74,7 +74,7 @@ public void decode( ChannelHandlerContext ctx, ByteBuf buffer, List<Object> list
RaftMessages.Type messageType = values[messageTypeWire];

MemberId from = retrieveMember( channel );
LazyComposer result;
LazyComposer composer;

if ( messageType.equals( VOTE_REQUEST ) )
{
Expand All @@ -84,14 +84,14 @@ public void decode( ChannelHandlerContext ctx, ByteBuf buffer, List<Object> list
long lastLogIndex = channel.getLong();
long lastLogTerm = channel.getLong();

result = noContent( new RaftMessages.Vote.Request( from, term, candidate, lastLogIndex, lastLogTerm ) );
composer = simpleMessageComposer( new RaftMessages.Vote.Request( from, term, candidate, lastLogIndex, lastLogTerm ) );
}
else if ( messageType.equals( VOTE_RESPONSE ) )
{
long term = channel.getLong();
boolean voteGranted = channel.get() == 1;

result = noContent( new RaftMessages.Vote.Response( from, term, voteGranted ) );
composer = simpleMessageComposer( new RaftMessages.Vote.Response( from, term, voteGranted ) );
}
else if ( messageType.equals( PRE_VOTE_REQUEST ) )
{
Expand All @@ -101,14 +101,14 @@ else if ( messageType.equals( PRE_VOTE_REQUEST ) )
long lastLogIndex = channel.getLong();
long lastLogTerm = channel.getLong();

result = noContent( new RaftMessages.PreVote.Request( from, term, candidate, lastLogIndex, lastLogTerm ) );
composer = simpleMessageComposer( new RaftMessages.PreVote.Request( from, term, candidate, lastLogIndex, lastLogTerm ) );
}
else if ( messageType.equals( PRE_VOTE_RESPONSE ) )
{
long term = channel.getLong();
boolean voteGranted = channel.get() == 1;

result = noContent( new RaftMessages.PreVote.Response( from, term, voteGranted ) );
composer = simpleMessageComposer( new RaftMessages.PreVote.Response( from, term, voteGranted ) );
}
else if ( messageType.equals( APPEND_ENTRIES_REQUEST ) )
{
Expand All @@ -117,9 +117,9 @@ else if ( messageType.equals( APPEND_ENTRIES_REQUEST ) )
long prevLogIndex = channel.getLong();
long prevLogTerm = channel.getLong();
long leaderCommit = channel.getLong();
int raftLogEntries = channel.getInt();
int entryCount = channel.getInt();

result = new AppendEntriesComposer( raftLogEntries, from, term, prevLogIndex, prevLogTerm, leaderCommit );
composer = new AppendEntriesComposer( entryCount, from, term, prevLogIndex, prevLogTerm, leaderCommit );
}
else if ( messageType.equals( APPEND_ENTRIES_RESPONSE ) )
{
Expand All @@ -128,57 +128,58 @@ else if ( messageType.equals( APPEND_ENTRIES_RESPONSE ) )
long matchIndex = channel.getLong();
long appendIndex = channel.getLong();

result = noContent( new RaftMessages.AppendEntries.Response( from, term, success, matchIndex, appendIndex ) );
composer = simpleMessageComposer( new RaftMessages.AppendEntries.Response( from, term, success, matchIndex, appendIndex ) );
}
else if ( messageType.equals( NEW_ENTRY_REQUEST ) )
{
result = new NewEntryRequestComposer( from );
composer = new NewEntryRequestComposer( from );
}
else if ( messageType.equals( HEARTBEAT ) )
{
long leaderTerm = channel.getLong();
long commitIndexTerm = channel.getLong();
long commitIndex = channel.getLong();

result = noContent( new RaftMessages.Heartbeat( from, leaderTerm, commitIndex, commitIndexTerm ) );
composer = simpleMessageComposer( new RaftMessages.Heartbeat( from, leaderTerm, commitIndex, commitIndexTerm ) );
}
else if ( messageType.equals( HEARTBEAT_RESPONSE ) )
{
result = noContent( new RaftMessages.HeartbeatResponse( from ) );
composer = simpleMessageComposer( new RaftMessages.HeartbeatResponse( from ) );
}
else if ( messageType.equals( LOG_COMPACTION_INFO ) )
{
long leaderTerm = channel.getLong();
long prevIndex = channel.getLong();

result = noContent( new RaftMessages.LogCompactionInfo( from, leaderTerm, prevIndex ) );
composer = simpleMessageComposer( new RaftMessages.LogCompactionInfo( from, leaderTerm, prevIndex ) );
}
else
{
throw new IllegalArgumentException( "Unknown message type" );
}

list.add( new ClusterIdAwareMessageComposer( result, clusterId ) );
list.add( new ClusterIdAwareMessageComposer( composer, clusterId ) );
protocol.expect( ContentType.ContentType );
}

static class ClusterIdAwareMessageComposer
{
private final LazyComposer result;
private final LazyComposer composer;
private final ClusterId clusterId;

ClusterIdAwareMessageComposer( LazyComposer result, ClusterId clusterId )
ClusterIdAwareMessageComposer( LazyComposer composer, ClusterId clusterId )
{
this.result = result;
this.composer = composer;
this.clusterId = clusterId;
}

RaftMessages.ClusterIdAwareMessage maybeCompose( Clock clock, Queue<Long> logEntryTerms, Queue<ReplicatedContent> replicatedContents )
{
RaftMessages.RaftMessage apply = result.apply( logEntryTerms, replicatedContents );
if ( apply != null )
RaftMessages.RaftMessage composedMessage = composer.apply( logEntryTerms, replicatedContents );

if ( composedMessage != null )
{
return RaftMessages.ReceivedInstantClusterIdAwareMessage.of( clock.instant(), clusterId, apply );
return RaftMessages.ReceivedInstantClusterIdAwareMessage.of( clock.instant(), clusterId, composedMessage );
}
else
{
Expand All @@ -200,23 +201,26 @@ interface LazyComposer extends BiFunction<Queue<Long>,Queue<ReplicatedContent>,R
{
}

private static LazyComposer noContent( RaftMessages.RaftMessage message )
/**
* A message without internal content components.
*/
private static LazyComposer simpleMessageComposer( RaftMessages.RaftMessage message )
{
return ( rle, rc ) -> message;
return ( terms, contents ) -> message;
}

private static class AppendEntriesComposer implements LazyComposer
{
private final int raftLogEntries;
private final int entryCount;
private final MemberId from;
private final long term;
private final long prevLogIndex;
private final long prevLogTerm;
private final long leaderCommit;

AppendEntriesComposer( int raftLogEntries, MemberId from, long term, long prevLogIndex, long prevLogTerm, long leaderCommit )
AppendEntriesComposer( int entryCount, MemberId from, long term, long prevLogIndex, long prevLogTerm, long leaderCommit )
{
this.raftLogEntries = raftLogEntries;
this.entryCount = entryCount;
this.from = from;
this.term = term;
this.prevLogIndex = prevLogIndex;
Expand All @@ -225,23 +229,23 @@ private static class AppendEntriesComposer implements LazyComposer
}

@Override
public RaftMessages.BaseRaftMessage apply( Queue<Long> rle, Queue<ReplicatedContent> rc )
public RaftMessages.BaseRaftMessage apply( Queue<Long> terms, Queue<ReplicatedContent> contents )
{
if ( rle.size() < raftLogEntries || rc.size() < raftLogEntries )
if ( terms.size() < entryCount || contents.size() < entryCount )
{
return null;
}
else
{
RaftLogEntry[] entries = new RaftLogEntry[raftLogEntries];
for ( int i = 0; i < raftLogEntries; i++ )
RaftLogEntry[] entries = new RaftLogEntry[entryCount];
for ( int i = 0; i < entryCount; i++ )
{
Long poll = rle.poll();
if ( poll == null )
Long term = terms.poll();
if ( term == null )
{
throw new IllegalArgumentException( "Term cannot be null" );
}
entries[i] = new RaftLogEntry( poll, rc.poll() );
entries[i] = new RaftLogEntry( term, contents.poll() );
}
return new RaftMessages.AppendEntries.Request( from, term, prevLogIndex, prevLogTerm, entries, leaderCommit );
}
Expand All @@ -258,15 +262,15 @@ private static class NewEntryRequestComposer implements LazyComposer
}

@Override
public RaftMessages.BaseRaftMessage apply( Queue<Long> rle, Queue<ReplicatedContent> rc )
public RaftMessages.BaseRaftMessage apply( Queue<Long> terms, Queue<ReplicatedContent> contents )
{
if ( rc.isEmpty() )
if ( contents.isEmpty() )
{
return null;
}
else
{
return new RaftMessages.NewEntry.Request( from, rc.poll() );
return new RaftMessages.NewEntry.Request( from, contents.poll() );
}
}
}
Expand Down

0 comments on commit 72d46d1

Please sign in to comment.