Skip to content

Commit

Permalink
Change name of local variables and extract classes
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW authored and martinfurmanski committed Jun 11, 2018
1 parent bdd6459 commit cb951f4
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 68 deletions.
Expand Up @@ -35,6 +35,7 @@ public class ChunkedReplicatedContent implements Marshal, ChunkedInput<ByteBuf>
{ {


private static final int DEFAULT_CHUNK_SIZE = 8192; private static final int DEFAULT_CHUNK_SIZE = 8192;
private static final int MINIMUM_CHUNK_SIZE = 7;
private final byte contentType; private final byte contentType;
private final ByteBufAwareMarshal byteBufAwareMarshal; private final ByteBufAwareMarshal byteBufAwareMarshal;
private final int chunkSize; private final int chunkSize;
Expand All @@ -45,9 +46,9 @@ public ChunkedReplicatedContent( byte contentType, ByteBufAwareMarshal byteBufAw
{ {
this.byteBufAwareMarshal = byteBufAwareMarshal; this.byteBufAwareMarshal = byteBufAwareMarshal;
this.chunkSize = chunkSize; this.chunkSize = chunkSize;
if ( chunkSize < 7 ) if ( chunkSize < MINIMUM_CHUNK_SIZE )
{ {
throw new IllegalArgumentException( "Chunk size must be at least 4 bytes" ); throw new IllegalArgumentException( "Chunk size must be at least " + MINIMUM_CHUNK_SIZE + " bytes" );
} }
this.contentType = contentType; this.contentType = contentType;
} }
Expand Down Expand Up @@ -111,7 +112,7 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws IOException
progress += buffer.readableBytes(); progress += buffer.readableBytes();
return buffer; return buffer;
} }
catch ( IOException e ) catch ( Throwable e )
{ {
buffer.release(); buffer.release();
throw e; throw e;
Expand Down
Expand Up @@ -29,15 +29,15 @@ public enum ContentType
RaftLogEntryTerms( (byte) 2 ), RaftLogEntryTerms( (byte) 2 ),
Message( (byte) 3 ); Message( (byte) 3 );


private final byte b; private final byte messageCode;


ContentType( byte b ) ContentType( byte messageCode )
{ {
this.b = b; this.messageCode = messageCode;
} }


public byte get() public byte get()
{ {
return b; return messageCode;
} }
} }
Expand Up @@ -45,8 +45,8 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
{ {
if ( contentTypeProtocol.isExpecting( ContentType.ContentType ) ) if ( contentTypeProtocol.isExpecting( ContentType.ContentType ) )
{ {
byte b = in.readByte(); byte messageCode = in.readByte();
ContentType contentType = getContentType( b ); ContentType contentType = getContentType( messageCode );
contentTypeProtocol.expect( contentType ); contentTypeProtocol.expect( contentType );
} }
else else
Expand All @@ -55,15 +55,15 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
} }
} }


private ContentType getContentType( byte b ) private ContentType getContentType( byte messageCode )
{ {
for ( ContentType contentType : ContentType.values() ) for ( ContentType contentType : ContentType.values() )
{ {
if ( contentType.get() == b ) if ( contentType.get() == messageCode )
{ {
return contentType; return contentType;
} }
} }
throw new IllegalArgumentException( "Illegal inbound. Could not find a ContentType with value " + b ); throw new IllegalArgumentException( "Illegal inbound. Could not find a ContentType with value " + messageCode );
} }
} }
Expand Up @@ -38,7 +38,7 @@ public class RaftMessageComposer extends MessageToMessageDecoder<Object>
{ {
private final Queue<ReplicatedContent> replicatedContents = new LinkedBlockingQueue<>(); private final Queue<ReplicatedContent> replicatedContents = new LinkedBlockingQueue<>();
private final Queue<Long> raftLogEntryTerms = new LinkedBlockingQueue<>(); private final Queue<Long> raftLogEntryTerms = new LinkedBlockingQueue<>();
private RaftMessageDecoder.RaftMessageCreator messageCreator; private RaftMessageDecoder.ClusterIdAwareMessageComposer messageComposer;
private final Clock clock; private final Clock clock;


public RaftMessageComposer( Clock clock ) public RaftMessageComposer( Clock clock )
Expand All @@ -60,21 +60,21 @@ else if ( msg instanceof RaftLogEntryTermsDecoder.RaftLogEntryTerms )
raftLogEntryTerms.add( term ); raftLogEntryTerms.add( term );
} }
} }
else if ( msg instanceof RaftMessageDecoder.RaftMessageCreator ) else if ( msg instanceof RaftMessageDecoder.ClusterIdAwareMessageComposer )
{ {
if ( messageCreator != null ) if ( messageComposer != null )
{ {
throw new IllegalStateException( "Received raft message header. Pipeline already contains message header waiting to build." ); throw new IllegalStateException( "Received raft message header. Pipeline already contains message header waiting to build." );
} }
messageCreator = (RaftMessageDecoder.RaftMessageCreator) msg; messageComposer = (RaftMessageDecoder.ClusterIdAwareMessageComposer) msg;
} }
else else
{ {
throw new IllegalStateException( "Unexpected object in the pipeline: " + msg ); throw new IllegalStateException( "Unexpected object in the pipeline: " + msg );
} }
if ( messageCreator != null ) if ( messageComposer != null )
{ {
RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage = messageCreator.maybeCompose( clock, raftLogEntryTerms, replicatedContents ); RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage = messageComposer.maybeCompose( clock, raftLogEntryTerms, replicatedContents );
if ( clusterIdAwareMessage != null ) if ( clusterIdAwareMessage != null )
{ {
clear( clusterIdAwareMessage ); clear( clusterIdAwareMessage );
Expand All @@ -85,7 +85,7 @@ else if ( msg instanceof RaftMessageDecoder.RaftMessageCreator )


private void clear( RaftMessages.ClusterIdAwareMessage message ) private void clear( RaftMessages.ClusterIdAwareMessage message )
{ {
messageCreator = null; messageComposer = null;
if ( !replicatedContents.isEmpty() || !raftLogEntryTerms.isEmpty() ) if ( !replicatedContents.isEmpty() || !raftLogEntryTerms.isEmpty() )
{ {
throw new IllegalStateException( String.format( throw new IllegalStateException( String.format(
Expand Down
Expand Up @@ -74,7 +74,7 @@ public void decode( ChannelHandlerContext ctx, ByteBuf buffer, List<Object> list
RaftMessages.Type messageType = values[messageTypeWire]; RaftMessages.Type messageType = values[messageTypeWire];


MemberId from = retrieveMember( channel ); MemberId from = retrieveMember( channel );
BiFunction<Queue<Long>,Queue<ReplicatedContent>,RaftMessages.BaseRaftMessage> result; LazyComposer result;


if ( messageType.equals( VOTE_REQUEST ) ) if ( messageType.equals( VOTE_REQUEST ) )
{ {
Expand Down Expand Up @@ -119,27 +119,7 @@ else if ( messageType.equals( APPEND_ENTRIES_REQUEST ) )
long leaderCommit = channel.getLong(); long leaderCommit = channel.getLong();
int raftLogEntries = channel.getInt(); int raftLogEntries = channel.getInt();


result = ( rle, rc ) -> result = new AppendEntriesComposer( raftLogEntries, from, term, prevLogIndex, prevLogTerm, leaderCommit );
{
if ( rle.size() < raftLogEntries || rc.size() < raftLogEntries )
{
return null;
}
else
{
RaftLogEntry[] entries = new RaftLogEntry[raftLogEntries];
for ( int i = 0; i < raftLogEntries; i++ )
{
Long poll = rle.poll();
if ( poll == null )
{
throw new IllegalArgumentException( "Term cannot be null" );
}
entries[i] = new RaftLogEntry( poll, rc.poll() );
}
return new RaftMessages.AppendEntries.Request( from, term, prevLogIndex, prevLogTerm, entries, leaderCommit );
}
};
} }
else if ( messageType.equals( APPEND_ENTRIES_RESPONSE ) ) else if ( messageType.equals( APPEND_ENTRIES_RESPONSE ) )
{ {
Expand All @@ -152,17 +132,7 @@ else if ( messageType.equals( APPEND_ENTRIES_RESPONSE ) )
} }
else if ( messageType.equals( NEW_ENTRY_REQUEST ) ) else if ( messageType.equals( NEW_ENTRY_REQUEST ) )
{ {
result = ( rle, rc ) -> result = new NewEntryRequestComposer( from );
{
if ( rc.isEmpty() )
{
return null;
}
else
{
return new RaftMessages.NewEntry.Request( from, rc.poll() );
}
};
} }
else if ( messageType.equals( HEARTBEAT ) ) else if ( messageType.equals( HEARTBEAT ) )
{ {
Expand All @@ -188,29 +158,24 @@ else if ( messageType.equals( LOG_COMPACTION_INFO ) )
throw new IllegalArgumentException( "Unknown message type" ); throw new IllegalArgumentException( "Unknown message type" );
} }


list.add( new RaftMessageCreator( result, clusterId ) ); list.add( new ClusterIdAwareMessageComposer( result, clusterId ) );
protocol.expect( ContentType.ContentType ); protocol.expect( ContentType.ContentType );
} }


private BiFunction<Queue<Long>,Queue<ReplicatedContent>,RaftMessages.BaseRaftMessage> noContent( RaftMessages.BaseRaftMessage message ) static class ClusterIdAwareMessageComposer
{ {
return ( rle, rc ) -> message; private final LazyComposer result;
}

static class RaftMessageCreator
{
private final BiFunction<Queue<Long>,Queue<ReplicatedContent>,RaftMessages.BaseRaftMessage> result;
private final ClusterId clusterId; private final ClusterId clusterId;


RaftMessageCreator( BiFunction<Queue<Long>,Queue<ReplicatedContent>,RaftMessages.BaseRaftMessage> result, ClusterId clusterId ) ClusterIdAwareMessageComposer( LazyComposer result, ClusterId clusterId )
{ {
this.result = result; this.result = result;
this.clusterId = clusterId; this.clusterId = clusterId;
} }


RaftMessages.ClusterIdAwareMessage maybeCompose( Clock clock, Queue<Long> logEntryTerms, Queue<ReplicatedContent> replicatedContents ) RaftMessages.ClusterIdAwareMessage maybeCompose( Clock clock, Queue<Long> logEntryTerms, Queue<ReplicatedContent> replicatedContents )
{ {
RaftMessages.BaseRaftMessage apply = result.apply( logEntryTerms, replicatedContents ); RaftMessages.RaftMessage apply = result.apply( logEntryTerms, replicatedContents );
if ( apply != null ) if ( apply != null )
{ {
return RaftMessages.ReceivedInstantClusterIdAwareMessage.of( clock.instant(), clusterId, apply ); return RaftMessages.ReceivedInstantClusterIdAwareMessage.of( clock.instant(), clusterId, apply );
Expand All @@ -227,4 +192,77 @@ private MemberId retrieveMember( ReadableChannel buffer ) throws IOException, En
MemberId.Marshal memberIdMarshal = new MemberId.Marshal(); MemberId.Marshal memberIdMarshal = new MemberId.Marshal();
return memberIdMarshal.unmarshal( buffer ); return memberIdMarshal.unmarshal( buffer );
} }

/**
* Builds the raft message. Should return {@code null} if provided collections does not contain enough data for building the message.
*/
interface LazyComposer extends BiFunction<Queue<Long>,Queue<ReplicatedContent>,RaftMessages.RaftMessage> {}

private static LazyComposer noContent( RaftMessages.RaftMessage message )
{
return ( rle, rc ) -> message;
}

private static class AppendEntriesComposer implements LazyComposer
{
private final int raftLogEntries;
private final MemberId from;
private final long term;
private final long prevLogIndex;
private final long prevLogTerm;
private final long leaderCommit;

AppendEntriesComposer( int raftLogEntries, MemberId from, long term, long prevLogIndex, long prevLogTerm, long leaderCommit )
{
this.raftLogEntries = raftLogEntries;
this.from = from;
this.term = term;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.leaderCommit = leaderCommit;
}

@Override
public RaftMessages.BaseRaftMessage apply( Queue<Long> rle, Queue<ReplicatedContent> rc )
{
if ( rle.size() < raftLogEntries || rc.size() < raftLogEntries )
{
return null;
}
else
{
RaftLogEntry[] entries = new RaftLogEntry[raftLogEntries];
for ( int i = 0; i < raftLogEntries; i++ )
{
Long poll = rle.poll();
if ( poll == null )
{
throw new IllegalArgumentException( "Term cannot be null" );
}
entries[i] = new RaftLogEntry( poll, rc.poll() );
}
return new RaftMessages.AppendEntries.Request( from, term, prevLogIndex, prevLogTerm, entries, leaderCommit );
}
}
}

private static class NewEntryRequestComposer implements LazyComposer
{
private final MemberId from;

NewEntryRequestComposer( MemberId from ) {this.from = from;}

@Override
public RaftMessages.BaseRaftMessage apply( Queue<Long> rle, Queue<ReplicatedContent> rc )
{
if ( rc.isEmpty() )
{
return null;
}
else
{
return new RaftMessages.NewEntry.Request( from, rc.poll() );
}
}
}
} }
Expand Up @@ -27,12 +27,9 @@
import java.time.Clock; import java.time.Clock;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.UUID; import java.util.UUID;
import java.util.function.BiFunction;


import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction; import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction;
import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;


Expand All @@ -41,7 +38,7 @@
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;


public class RaftMessageComposerTest public class ClusterIdAwareMessageComposerTest
{ {


@Test @Test
Expand Down Expand Up @@ -105,8 +102,8 @@ private RaftMessages.PruneRequest dummyRequest()
return new RaftMessages.PruneRequest( 1 ); return new RaftMessages.PruneRequest( 1 );
} }


private RaftMessageDecoder.RaftMessageCreator messageCreator( BiFunction<Queue<Long>,Queue<ReplicatedContent>,RaftMessages.BaseRaftMessage> biFunction ) private RaftMessageDecoder.ClusterIdAwareMessageComposer messageCreator( RaftMessageDecoder.LazyComposer biFunction )
{ {
return new RaftMessageDecoder.RaftMessageCreator( biFunction, new ClusterId( UUID.randomUUID() ) ); return new RaftMessageDecoder.ClusterIdAwareMessageComposer( biFunction, new ClusterId( UUID.randomUUID() ) );
} }
} }

0 comments on commit cb951f4

Please sign in to comment.