Skip to content

Commit

Permalink
Moving StoreId checking down a layer so that we can do message
Browse files Browse the repository at this point in the history
verification of messages sequentially.

We previously had a race condition where we'd start downloading
a store and if we processed another message before we'd finished
downloading we could end up thinking we had a mismatching StoreId
on a non empty store because our StoreId hadn't updated yet

make the batch handle raft messages
  • Loading branch information
Mark Needham committed Jul 7, 2016
1 parent 93049cd commit 551165b
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 178 deletions.
Expand Up @@ -24,26 +24,35 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.RaftMessages.RaftMessage;
import org.neo4j.coreedge.raft.net.Inbound.MessageHandler;
import org.neo4j.coreedge.server.StoreId;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.SECONDS;

public class BatchingMessageHandler implements Runnable, MessageHandler<RaftMessage>
public class BatchingMessageHandler implements Runnable, MessageHandler<RaftMessages.StoreIdAwareMessage>, MismatchedStoreIdService
{
private final Log log;
private final MessageHandler<RaftMessage> innerHandler;
private final BlockingQueue<RaftMessages.StoreIdAwareMessage> messageQueue;

private final BlockingQueue<RaftMessage> messageQueue;
private final int maxBatch;
private final List<RaftMessage> batch;
private final List<RaftMessages.RaftMessage> batch;

private final LocalDatabase localDatabase;
private RaftStateMachine raftStateMachine;
private final List<MismatchedStoreListener> listeners = new ArrayList<>( );

public BatchingMessageHandler( MessageHandler<RaftMessage> innerHandler, LogProvider logProvider,
int queueSize, int maxBatch )
int queueSize, int maxBatch, LocalDatabase localDatabase,
RaftStateMachine raftStateMachine )
{
this.innerHandler = innerHandler;
this.localDatabase = localDatabase;
this.raftStateMachine = raftStateMachine;
this.log = logProvider.getLog( getClass() );
this.maxBatch = maxBatch;

Expand All @@ -52,7 +61,7 @@ public BatchingMessageHandler( MessageHandler<RaftMessage> innerHandler, LogProv
}

@Override
public void handle( RaftMessage message )
public void handle( RaftMessages.StoreIdAwareMessage message )
{
try
{
Expand All @@ -67,7 +76,7 @@ public void handle( RaftMessage message )
@Override
public void run()
{
RaftMessage message = null;
RaftMessages.StoreIdAwareMessage message = null;
try
{
message = messageQueue.poll( 1, SECONDS );
Expand All @@ -79,26 +88,66 @@ public void run()

if ( message != null )
{
if ( messageQueue.isEmpty() )
// do the check here
RaftMessages.RaftMessage innerMessage = message.message();
StoreId storeId = message.storeId();

if ( message.storeId().equals( localDatabase.storeId() ) )
{
innerHandler.handle( message );
if ( messageQueue.isEmpty() )
{
innerHandler.handle( message.message() );
}
else
{
batch.clear();
batch.add( innerMessage );
drain( messageQueue, batch, maxBatch - 1 );
collateAndHandleBatch( batch );
}
}
else
{
batch.clear();
batch.add( message );
messageQueue.drainTo( batch, maxBatch - 1 );
if ( localDatabase.isEmpty() )
{
raftStateMachine.downloadSnapshot( innerMessage.from() );
}
else
{
log.info( "Discarding message owing to mismatched storeId and non-empty store. " +
"Expected: %s, Encountered: %s", storeId, localDatabase.storeId() );
listeners.forEach( l -> {
MismatchedStoreIdException ex = new MismatchedStoreIdException( storeId, localDatabase.storeId() );
l.onMismatchedStore( ex );
} );
}

collateAndHandleBatch( batch );
}
}
}

private void collateAndHandleBatch( List<RaftMessage> batch )
private void drain( BlockingQueue<RaftMessages.StoreIdAwareMessage> messageQueue,
List<RaftMessage> batch, int maxElements )
{
List<RaftMessages.StoreIdAwareMessage> tempDraining = new ArrayList<>();
messageQueue.drainTo( tempDraining, maxElements );

for ( RaftMessages.StoreIdAwareMessage storeIdAwareMessage : tempDraining )
{
batch.add( storeIdAwareMessage.message() );
}
}

public void addMismatchedStoreListener( BatchingMessageHandler.MismatchedStoreListener listener )
{
listeners.add(listener);
}

private void collateAndHandleBatch( List<RaftMessages.RaftMessage> batch )
{
RaftMessages.NewEntry.Batch batchRequest = null;

for ( RaftMessage message : batch )
for ( RaftMessages.RaftMessage message : batch )
{
if ( message instanceof RaftMessages.NewEntry.Request )
{
Expand Down
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft;

import org.neo4j.coreedge.server.StoreId;
import org.neo4j.kernel.impl.store.StoreFailureException;

public interface MismatchedStoreIdService
{
void addMismatchedStoreListener( BatchingMessageHandler.MismatchedStoreListener listener );

interface MismatchedStoreListener
{
void onMismatchedStore( BatchingMessageHandler.MismatchedStoreIdException ex );
}

class MismatchedStoreIdException extends StoreFailureException
{
public MismatchedStoreIdException( StoreId expected, StoreId encountered )
{
super( "Expected:" + expected + ", encountered:" + encountered );
}
}
}
Expand Up @@ -38,34 +38,30 @@
import java.util.concurrent.TimeUnit;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.net.Inbound;
import org.neo4j.coreedge.raft.net.codecs.RaftMessageDecoder;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.coreedge.server.ListenSocketAddress;
import org.neo4j.coreedge.server.StoreId;
import org.neo4j.coreedge.server.logging.ExceptionLoggingHandler;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.impl.store.MismatchingStoreIdException;
import org.neo4j.kernel.impl.store.StoreFailureException;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages.RaftMessage>
public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages.StoreIdAwareMessage>
{
private final ListenSocketAddress listenAddress;
private final LocalDatabase localDatabase;
private final RaftStateMachine raftStateMachine;
private final Log log;
private final ChannelMarshal<ReplicatedContent> marshal;
private MessageHandler<RaftMessages.RaftMessage> messageHandler;
private MessageHandler<RaftMessages.StoreIdAwareMessage> messageHandler;
private EventLoopGroup workerGroup;
private Channel channel;
private final List<MismatchedStoreListener> listeners = new ArrayList<>();
private final List<BatchingMessageHandler.MismatchedStoreListener> listeners = new ArrayList<>();

private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" );

Expand Down Expand Up @@ -134,16 +130,11 @@ protected void initChannel( SocketChannel ch ) throws Exception
}

@Override
public void registerHandler( Inbound.MessageHandler<RaftMessages.RaftMessage> handler )
public void registerHandler( Inbound.MessageHandler<RaftMessages.StoreIdAwareMessage> handler )
{
this.messageHandler = handler;
}

public void addMismatchedStoreListener( MismatchedStoreListener listener )
{
listeners.add( listener );
}

private class RaftMessageHandler extends SimpleChannelInboundHandler<RaftMessages.StoreIdAwareMessage>
{
@Override
Expand All @@ -152,29 +143,7 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext,
{
try
{
RaftMessages.RaftMessage message = storeIdAwareMessage.message();
StoreId storeId = storeIdAwareMessage.storeId();

if ( storeId.equals( localDatabase.storeId() ) )
{
messageHandler.handle( message );
}
else
{
if ( localDatabase.isEmpty() )
{
raftStateMachine.downloadSnapshot( message.from() );
}
else
{
log.info( "Discarding message owing to mismatched storeId and non-empty store. Expected: %s, " +
"Encountered: %s", storeId, localDatabase.storeId() );
listeners.forEach( l -> {
MismatchedStoreIdException ex = new MismatchedStoreIdException( storeId, localDatabase.storeId() );
l.onMismatchedStore( ex );
} );
}
}
messageHandler.handle( storeIdAwareMessage );
}
catch ( Exception e )
{
Expand All @@ -183,16 +152,4 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext,
}
}

public interface MismatchedStoreListener
{
void onMismatchedStore(MismatchedStoreIdException ex);
}

public class MismatchedStoreIdException extends StoreFailureException
{
public MismatchedStoreIdException( StoreId expected, StoreId encountered )
{
super( "Expected:" + expected + ", encountered:" + encountered );
}
}
}
Expand Up @@ -22,10 +22,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.BatchingMessageHandler;
import org.neo4j.coreedge.raft.MismatchedStoreIdService;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.kernel.impl.store.MismatchingStoreIdException;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -53,15 +53,16 @@ public class MembershipWaiter
private final CoreMember myself;
private final JobScheduler jobScheduler;
private final long maxCatchupLag;
private final RaftServer raftServer;
private final MismatchedStoreIdService mismatchedStoreIdService;
private final Log log;

public MembershipWaiter( CoreMember myself, JobScheduler jobScheduler, long maxCatchupLag, RaftServer raftServer, LogProvider logProvider )
public MembershipWaiter( CoreMember myself, JobScheduler jobScheduler, long maxCatchupLag,
MismatchedStoreIdService mismatchedStoreIdService, LogProvider logProvider )
{
this.myself = myself;
this.jobScheduler = jobScheduler;
this.maxCatchupLag = maxCatchupLag;
this.raftServer = raftServer;
this.mismatchedStoreIdService = mismatchedStoreIdService;
this.log = logProvider.getLog( getClass() );
}

Expand All @@ -70,7 +71,7 @@ public CompletableFuture<Boolean> waitUntilCaughtUpMember( ReadableRaftState raf
CompletableFuture<Boolean> catchUpFuture = new CompletableFuture<>();

Evaluator evaluator = new Evaluator( raftState, catchUpFuture );
raftServer.addMismatchedStoreListener( evaluator );
mismatchedStoreIdService.addMismatchedStoreListener( evaluator );

JobScheduler.JobHandle jobHandle = jobScheduler.scheduleRecurring(
new JobScheduler.Group( getClass().toString(), POOLED ),
Expand All @@ -81,7 +82,7 @@ public CompletableFuture<Boolean> waitUntilCaughtUpMember( ReadableRaftState raf
return catchUpFuture;
}

private class Evaluator implements Runnable, RaftServer.MismatchedStoreListener
private class Evaluator implements Runnable, BatchingMessageHandler.MismatchedStoreListener
{
private final ReadableRaftState raftState;
private final CompletableFuture<Boolean> catchUpFuture;
Expand Down Expand Up @@ -138,7 +139,7 @@ private boolean caughtUpWithLeader()
}

@Override
public void onMismatchedStore(RaftServer.MismatchedStoreIdException ex)
public void onMismatchedStore(BatchingMessageHandler.MismatchedStoreIdException ex)
{
catchUpFuture.completeExceptionally( ex );
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.coreedge.raft.net;

import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.BatchingMessageHandler;

public interface Inbound<M extends Message>
{
Expand Down

0 comments on commit 551165b

Please sign in to comment.