Skip to content

Commit

Permalink
Added tests around the layer down StoreId verification.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Needham committed Jul 8, 2016
1 parent aeef37b commit 64ab9ff
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 17 deletions.
Expand Up @@ -88,7 +88,6 @@ public void run()

if ( message != null )
{
// do the check here
RaftMessages.RaftMessage innerMessage = message.message();
StoreId storeId = message.storeId();

Expand Down
Expand Up @@ -24,11 +24,11 @@

public interface MismatchedStoreIdService
{
void addMismatchedStoreListener( BatchingMessageHandler.MismatchedStoreListener listener );
void addMismatchedStoreListener( MismatchedStoreListener listener );

interface MismatchedStoreListener
{
void onMismatchedStore( BatchingMessageHandler.MismatchedStoreIdException ex );
void onMismatchedStore( MismatchedStoreIdException ex );
}

class MismatchedStoreIdException extends StoreFailureException
Expand Down
Expand Up @@ -82,7 +82,7 @@ public CompletableFuture<Boolean> waitUntilCaughtUpMember( ReadableRaftState raf
return catchUpFuture;
}

private class Evaluator implements Runnable, BatchingMessageHandler.MismatchedStoreListener
private class Evaluator implements Runnable, MismatchedStoreIdService.MismatchedStoreListener
{
private final ReadableRaftState raftState;
private final CompletableFuture<Boolean> catchUpFuture;
Expand Down
Expand Up @@ -25,14 +25,18 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.net.Inbound;
import org.neo4j.coreedge.server.StoreId;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.NullLogProvider;

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

Expand All @@ -42,25 +46,101 @@ public class BatchingMessageHandlerTest
private static final int QUEUE_SIZE = 64;
private LocalDatabase localDatabase = mock( LocalDatabase.class );
private RaftStateMachine raftStateMachine = mock( RaftStateMachine.class );
private StoreId localStoreId = new StoreId( 1, 2, 3, 4 );

@Before
public void setup()
{
when(localDatabase.storeId()).thenReturn( new StoreId( 1,2,3,4 ) );
when( localDatabase.storeId() ).thenReturn( localStoreId );
}

@Test
public void shouldDownloadSnapshotOnStoreIdMismatch() throws Exception
{
// given
@SuppressWarnings("unchecked")
Inbound.MessageHandler<RaftMessages.RaftMessage> innerHandler = mock( Inbound.MessageHandler.class );

when( localDatabase.isEmpty() ).thenReturn( true );
BatchingMessageHandler batchHandler = new BatchingMessageHandler(
innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine );
RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null );

StoreId otherStoreId = new StoreId( 5, 6, 7, 8 );

batchHandler.handle( new RaftMessages.StoreIdAwareMessage( otherStoreId, message ) );

// when
batchHandler.run();

// then
verifyNoMoreInteractions( innerHandler );
verify( raftStateMachine ).downloadSnapshot( message.from() );
}

@Test
public void shouldLogOnStoreIdMismatchAndNonEmptyStore() throws Exception
{
// given
@SuppressWarnings("unchecked")
Inbound.MessageHandler<RaftMessages.RaftMessage> innerHandler = mock( Inbound.MessageHandler.class );

when( localDatabase.isEmpty() ).thenReturn( false );
AssertableLogProvider logProvider = new AssertableLogProvider();
BatchingMessageHandler batchHandler = new BatchingMessageHandler(
innerHandler, logProvider, QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine );
RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null );

StoreId otherStoreId = new StoreId( 5, 6, 7, 8 );

batchHandler.handle( new RaftMessages.StoreIdAwareMessage( otherStoreId, message ) );

// when
batchHandler.run();

// then
verifyNoMoreInteractions( innerHandler );
logProvider.assertContainsLogCallContaining( "Discarding message" );
}

@Test
public void shouldInformListenersOnStoreIdMismatch() throws Exception
{
// given
@SuppressWarnings("unchecked")
Inbound.MessageHandler<RaftMessages.RaftMessage> innerHandler = mock( Inbound.MessageHandler.class );

when( localDatabase.isEmpty() ).thenReturn( false );
BatchingMessageHandler batchHandler = new BatchingMessageHandler(
innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine );
RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null );

StoreId otherStoreId = new StoreId( 5, 6, 7, 8 );

AtomicBoolean listenerInvoked = new AtomicBoolean( false );
batchHandler.addMismatchedStoreListener( ex -> listenerInvoked.set( true ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( otherStoreId, message ) );

// when
batchHandler.run();

// then
verifyNoMoreInteractions( innerHandler );
assertTrue(listenerInvoked.get());
}

@Test
public void shouldInvokeInnerHandlerWhenRun() throws Exception
{
// given
@SuppressWarnings( "unchecked" )
@SuppressWarnings("unchecked")
Inbound.MessageHandler<RaftMessages.RaftMessage> innerHandler = mock( Inbound.MessageHandler.class );

BatchingMessageHandler batchHandler = new BatchingMessageHandler(
innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine );
RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null );

batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), message ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, message ) );
verifyZeroInteractions( innerHandler );

// when
Expand All @@ -74,7 +154,7 @@ public void shouldInvokeInnerHandlerWhenRun() throws Exception
public void shouldInvokeHandlerOnQueuedMessage() throws Exception
{
// given
@SuppressWarnings( "unchecked" )
@SuppressWarnings("unchecked")
Inbound.MessageHandler<RaftMessages.RaftMessage> innerHandler = mock( Inbound.MessageHandler.class );

BatchingMessageHandler batchHandler = new BatchingMessageHandler(
Expand All @@ -91,7 +171,7 @@ public void shouldInvokeHandlerOnQueuedMessage() throws Exception
Thread.sleep( 50 );

// when
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), message ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, message ) );

// then
future.get();
Expand All @@ -102,7 +182,7 @@ public void shouldInvokeHandlerOnQueuedMessage() throws Exception
public void shouldBatchRequests() throws Exception
{
// given
@SuppressWarnings( "unchecked" )
@SuppressWarnings("unchecked")
Inbound.MessageHandler<RaftMessages.RaftMessage> innerHandler = mock( Inbound.MessageHandler.class );

BatchingMessageHandler batchHandler = new BatchingMessageHandler(
Expand All @@ -112,8 +192,8 @@ public void shouldBatchRequests() throws Exception
RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request( null, contentA );
RaftMessages.NewEntry.Request messageB = new RaftMessages.NewEntry.Request( null, contentB );

batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageA ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageB ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageA ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageB ) );
verifyZeroInteractions( innerHandler );

// when
Expand All @@ -130,7 +210,7 @@ public void shouldBatchRequests() throws Exception
public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Exception
{
// given
@SuppressWarnings( "unchecked" )
@SuppressWarnings("unchecked")
Inbound.MessageHandler<RaftMessages.RaftMessage> innerHandler = mock( Inbound.MessageHandler.class );

BatchingMessageHandler batchHandler = new BatchingMessageHandler(
Expand All @@ -143,10 +223,10 @@ public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Excep
RaftMessages.NewEntry.Request messageC = new RaftMessages.NewEntry.Request( null, contentC );
RaftMessages.Heartbeat messageD = new RaftMessages.Heartbeat( null, 1, 1, 1 );

batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageA ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageB ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageC ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageD ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageA ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageB ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageC ) );
batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageD ) );
verifyZeroInteractions( innerHandler );

// when
Expand Down

0 comments on commit 64ab9ff

Please sign in to comment.