Skip to content

Commit

Permalink
Refactor enriched raft message hierarchy, BatchingMessageHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkerr9000 committed Dec 18, 2017
1 parent a188039 commit 2b9e03e
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 115 deletions.
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.causalclustering.core; package org.neo4j.causalclustering.core;


import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -29,7 +28,7 @@


import org.neo4j.causalclustering.core.consensus.ContinuousJob; import org.neo4j.causalclustering.core.consensus.ContinuousJob;
import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.RaftMessages.RaftMessage; import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler; import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.causalclustering.messaging.ComposableMessageHandler; import org.neo4j.causalclustering.messaging.ComposableMessageHandler;
Expand All @@ -47,6 +46,7 @@ class BatchingMessageHandler implements Runnable, LifecycleMessageHandler<RaftMe
private final List<RaftMessages.ReceivedInstantClusterIdAwareMessage> batch; private final List<RaftMessages.ReceivedInstantClusterIdAwareMessage> batch;
private final BlockingQueue<RaftMessages.ReceivedInstantClusterIdAwareMessage> messageQueue; private final BlockingQueue<RaftMessages.ReceivedInstantClusterIdAwareMessage> messageQueue;
private final ContinuousJob job; private final ContinuousJob job;
private final ContentHandler contentHandler = new ContentHandler();


private volatile boolean stopped; private volatile boolean stopped;


Expand All @@ -63,7 +63,7 @@ class BatchingMessageHandler implements Runnable, LifecycleMessageHandler<RaftMe


static ComposableMessageHandler composable( int queueSize, int maxBatch, Function<Runnable,ContinuousJob> jobSchedulerFactory, LogProvider logProvider ) static ComposableMessageHandler composable( int queueSize, int maxBatch, Function<Runnable,ContinuousJob> jobSchedulerFactory, LogProvider logProvider )
{ {
return (LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage> delegate) -> return ( LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage> delegate ) ->
new BatchingMessageHandler( delegate, queueSize, maxBatch, jobSchedulerFactory, logProvider ); new BatchingMessageHandler( delegate, queueSize, maxBatch, jobSchedulerFactory, logProvider );
} }
@Override @Override
Expand Down Expand Up @@ -131,45 +131,128 @@ private void drain( BlockingQueue<RaftMessages.ReceivedInstantClusterIdAwareMess
batch.addAll( tempDraining ); batch.addAll( tempDraining );
} }


// TODO consider refactoring
// BatchRequest is cluster & received at aware
// Replace instanceOf with visitor
private void collateAndHandleBatch( List<RaftMessages.ReceivedInstantClusterIdAwareMessage> batch ) private void collateAndHandleBatch( List<RaftMessages.ReceivedInstantClusterIdAwareMessage> batch )
{ {
RaftMessages.ReceivedInstantClusterIdAwareMessage firstMessage = batch.get( 0 ); RaftMessages.ReceivedInstantClusterIdAwareMessage<RaftMessages.NewEntry.BatchRequest> batchRequest = null;
RaftMessages.NewEntry.BatchRequest batchRequest = null;
Instant firstReceivedAt = firstMessage.receivedAt();
ClusterId clusterId = firstMessage.clusterId();


for ( RaftMessages.ReceivedInstantClusterIdAwareMessage wrappedMessage : batch ) for ( RaftMessages.ReceivedInstantClusterIdAwareMessage message : batch )
{ {
if ( batchRequest != null && !wrappedMessage.clusterId().equals( clusterId )) if ( batchRequest != null && !message.clusterId().equals( batchRequest.clusterId() ) )
{ {
handler.handle( RaftMessages.ReceivedInstantClusterIdAwareMessage.of( firstReceivedAt, clusterId, batchRequest ) ); handler.handle( batchRequest );
batchRequest = null; batchRequest = null;
} }
clusterId = wrappedMessage.clusterId();
RaftMessage message = wrappedMessage.message();
if ( message instanceof RaftMessages.NewEntry.Request )
{
RaftMessages.NewEntry.Request newEntryRequest = (RaftMessages.NewEntry.Request) message;


ReplicatedContent replicatedContent = message.dispatch( contentHandler );
if ( replicatedContent != null )
{
if ( batchRequest == null ) if ( batchRequest == null )
{ {
batchRequest = new RaftMessages.NewEntry.BatchRequest( batch.size() ); batchRequest =
firstReceivedAt = wrappedMessage.receivedAt(); RaftMessages.ReceivedInstantClusterIdAwareMessage.of(
message.receivedAt(),
message.clusterId(),
new RaftMessages.NewEntry.BatchRequest( batch.size() )
);
} }
batchRequest.add( newEntryRequest.content() ); batchRequest.message().add( replicatedContent );
} }
else else
{ {
handler.handle( wrappedMessage ); handler.handle( message );
} }
} }


if ( batchRequest != null ) if ( batchRequest != null )
{ {
handler.handle( RaftMessages.ReceivedInstantClusterIdAwareMessage.of( firstReceivedAt, clusterId, batchRequest ) ); handler.handle( batchRequest );
}
}

class ContentHandler implements RaftMessages.Handler<ReplicatedContent, RuntimeException>
{
@Override
public ReplicatedContent handle( RaftMessages.NewEntry.Request request ) throws RuntimeException
{
return request.content();
}

@Override
public ReplicatedContent handle( RaftMessages.NewEntry.BatchRequest batchRequest ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.Vote.Request request ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.Vote.Response response ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.PreVote.Request request ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.PreVote.Response response ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.AppendEntries.Request request ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.AppendEntries.Response response ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.Heartbeat heartbeat ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.LogCompactionInfo logCompactionInfo ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.HeartbeatResponse heartbeatResponse ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.Timeout.Election election ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.Timeout.Heartbeat heartbeat ) throws RuntimeException
{
return null;
}

@Override
public ReplicatedContent handle( RaftMessages.PruneRequest pruneRequest ) throws RuntimeException
{
return null;
} }
} }
} }

0 comments on commit 2b9e03e

Please sign in to comment.