diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/RaftMessageDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/RaftMessageDecoder.java index 15d14845087bb..8d07a50c738bc 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/RaftMessageDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/RaftMessageDecoder.java @@ -74,7 +74,7 @@ public void decode( ChannelHandlerContext ctx, ByteBuf buffer, List list RaftMessages.Type messageType = values[messageTypeWire]; MemberId from = retrieveMember( channel ); - LazyComposer result; + LazyComposer composer; if ( messageType.equals( VOTE_REQUEST ) ) { @@ -84,14 +84,14 @@ public void decode( ChannelHandlerContext ctx, ByteBuf buffer, List list long lastLogIndex = channel.getLong(); long lastLogTerm = channel.getLong(); - result = noContent( new RaftMessages.Vote.Request( from, term, candidate, lastLogIndex, lastLogTerm ) ); + composer = simpleMessageComposer( new RaftMessages.Vote.Request( from, term, candidate, lastLogIndex, lastLogTerm ) ); } else if ( messageType.equals( VOTE_RESPONSE ) ) { long term = channel.getLong(); boolean voteGranted = channel.get() == 1; - result = noContent( new RaftMessages.Vote.Response( from, term, voteGranted ) ); + composer = simpleMessageComposer( new RaftMessages.Vote.Response( from, term, voteGranted ) ); } else if ( messageType.equals( PRE_VOTE_REQUEST ) ) { @@ -101,14 +101,14 @@ else if ( messageType.equals( PRE_VOTE_REQUEST ) ) long lastLogIndex = channel.getLong(); long lastLogTerm = channel.getLong(); - result = noContent( new RaftMessages.PreVote.Request( from, term, candidate, lastLogIndex, lastLogTerm ) ); + composer = simpleMessageComposer( new RaftMessages.PreVote.Request( from, term, candidate, lastLogIndex, lastLogTerm ) ); } else if ( messageType.equals( PRE_VOTE_RESPONSE ) ) { long term = channel.getLong(); boolean voteGranted = channel.get() == 1; - result = noContent( new RaftMessages.PreVote.Response( from, term, voteGranted ) ); + composer = simpleMessageComposer( new RaftMessages.PreVote.Response( from, term, voteGranted ) ); } else if ( messageType.equals( APPEND_ENTRIES_REQUEST ) ) { @@ -117,9 +117,9 @@ else if ( messageType.equals( APPEND_ENTRIES_REQUEST ) ) long prevLogIndex = channel.getLong(); long prevLogTerm = channel.getLong(); long leaderCommit = channel.getLong(); - int raftLogEntries = channel.getInt(); + int entryCount = channel.getInt(); - result = new AppendEntriesComposer( raftLogEntries, from, term, prevLogIndex, prevLogTerm, leaderCommit ); + composer = new AppendEntriesComposer( entryCount, from, term, prevLogIndex, prevLogTerm, leaderCommit ); } else if ( messageType.equals( APPEND_ENTRIES_RESPONSE ) ) { @@ -128,11 +128,11 @@ else if ( messageType.equals( APPEND_ENTRIES_RESPONSE ) ) long matchIndex = channel.getLong(); long appendIndex = channel.getLong(); - result = noContent( new RaftMessages.AppendEntries.Response( from, term, success, matchIndex, appendIndex ) ); + composer = simpleMessageComposer( new RaftMessages.AppendEntries.Response( from, term, success, matchIndex, appendIndex ) ); } else if ( messageType.equals( NEW_ENTRY_REQUEST ) ) { - result = new NewEntryRequestComposer( from ); + composer = new NewEntryRequestComposer( from ); } else if ( messageType.equals( HEARTBEAT ) ) { @@ -140,45 +140,46 @@ else if ( messageType.equals( HEARTBEAT ) ) long commitIndexTerm = channel.getLong(); long commitIndex = channel.getLong(); - result = noContent( new RaftMessages.Heartbeat( from, leaderTerm, commitIndex, commitIndexTerm ) ); + composer = simpleMessageComposer( new RaftMessages.Heartbeat( from, leaderTerm, commitIndex, commitIndexTerm ) ); } else if ( messageType.equals( HEARTBEAT_RESPONSE ) ) { - result = noContent( new RaftMessages.HeartbeatResponse( from ) ); + composer = simpleMessageComposer( new RaftMessages.HeartbeatResponse( from ) ); } else if ( messageType.equals( LOG_COMPACTION_INFO ) ) { long leaderTerm = channel.getLong(); long prevIndex = channel.getLong(); - result = noContent( new RaftMessages.LogCompactionInfo( from, leaderTerm, prevIndex ) ); + composer = simpleMessageComposer( new RaftMessages.LogCompactionInfo( from, leaderTerm, prevIndex ) ); } else { throw new IllegalArgumentException( "Unknown message type" ); } - list.add( new ClusterIdAwareMessageComposer( result, clusterId ) ); + list.add( new ClusterIdAwareMessageComposer( composer, clusterId ) ); protocol.expect( ContentType.ContentType ); } static class ClusterIdAwareMessageComposer { - private final LazyComposer result; + private final LazyComposer composer; private final ClusterId clusterId; - ClusterIdAwareMessageComposer( LazyComposer result, ClusterId clusterId ) + ClusterIdAwareMessageComposer( LazyComposer composer, ClusterId clusterId ) { - this.result = result; + this.composer = composer; this.clusterId = clusterId; } RaftMessages.ClusterIdAwareMessage maybeCompose( Clock clock, Queue logEntryTerms, Queue replicatedContents ) { - RaftMessages.RaftMessage apply = result.apply( logEntryTerms, replicatedContents ); - if ( apply != null ) + RaftMessages.RaftMessage composedMessage = composer.apply( logEntryTerms, replicatedContents ); + + if ( composedMessage != null ) { - return RaftMessages.ReceivedInstantClusterIdAwareMessage.of( clock.instant(), clusterId, apply ); + return RaftMessages.ReceivedInstantClusterIdAwareMessage.of( clock.instant(), clusterId, composedMessage ); } else { @@ -200,23 +201,26 @@ interface LazyComposer extends BiFunction,Queue,R { } - private static LazyComposer noContent( RaftMessages.RaftMessage message ) + /** + * A message without internal content components. + */ + private static LazyComposer simpleMessageComposer( RaftMessages.RaftMessage message ) { - return ( rle, rc ) -> message; + return ( terms, contents ) -> message; } private static class AppendEntriesComposer implements LazyComposer { - private final int raftLogEntries; + private final int entryCount; private final MemberId from; private final long term; private final long prevLogIndex; private final long prevLogTerm; private final long leaderCommit; - AppendEntriesComposer( int raftLogEntries, MemberId from, long term, long prevLogIndex, long prevLogTerm, long leaderCommit ) + AppendEntriesComposer( int entryCount, MemberId from, long term, long prevLogIndex, long prevLogTerm, long leaderCommit ) { - this.raftLogEntries = raftLogEntries; + this.entryCount = entryCount; this.from = from; this.term = term; this.prevLogIndex = prevLogIndex; @@ -225,23 +229,23 @@ private static class AppendEntriesComposer implements LazyComposer } @Override - public RaftMessages.BaseRaftMessage apply( Queue rle, Queue rc ) + public RaftMessages.BaseRaftMessage apply( Queue terms, Queue contents ) { - if ( rle.size() < raftLogEntries || rc.size() < raftLogEntries ) + if ( terms.size() < entryCount || contents.size() < entryCount ) { return null; } else { - RaftLogEntry[] entries = new RaftLogEntry[raftLogEntries]; - for ( int i = 0; i < raftLogEntries; i++ ) + RaftLogEntry[] entries = new RaftLogEntry[entryCount]; + for ( int i = 0; i < entryCount; i++ ) { - Long poll = rle.poll(); - if ( poll == null ) + Long term = terms.poll(); + if ( term == null ) { throw new IllegalArgumentException( "Term cannot be null" ); } - entries[i] = new RaftLogEntry( poll, rc.poll() ); + entries[i] = new RaftLogEntry( term, contents.poll() ); } return new RaftMessages.AppendEntries.Request( from, term, prevLogIndex, prevLogTerm, entries, leaderCommit ); } @@ -258,15 +262,15 @@ private static class NewEntryRequestComposer implements LazyComposer } @Override - public RaftMessages.BaseRaftMessage apply( Queue rle, Queue rc ) + public RaftMessages.BaseRaftMessage apply( Queue terms, Queue contents ) { - if ( rc.isEmpty() ) + if ( contents.isEmpty() ) { return null; } else { - return new RaftMessages.NewEntry.Request( from, rc.poll() ); + return new RaftMessages.NewEntry.Request( from, contents.poll() ); } } }