From af3abdbc9170cac00da152697981943559f8f001 Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Thu, 16 Jun 2016 11:54:47 +0100 Subject: [PATCH] Split Outbound to have two sends which makes it much easier for clients to use --- .../catchup/storecopy/CoreClient.java | 5 +- .../org/neo4j/coreedge/raft/RaftMessages.java | 2 - .../coreedge/raft/net/LoggingOutbound.java | 11 +++- .../org/neo4j/coreedge/raft/net/Outbound.java | 11 +++- .../neo4j/coreedge/raft/net/RaftOutbound.java | 25 +++++---- .../raft/net/codecs/RaftMessageEncoder.java | 2 - .../neo4j/coreedge/server/SenderService.java | 35 ++++++++----- .../neo4j/coreedge/raft/DirectNetworking.java | 29 ++++++++--- .../raft/OutboundMessageCollector.java | 32 +++++++----- .../coreedge/raft/RaftInstanceBuilder.java | 15 +++++- .../neo4j/coreedge/raft/RaftTestNetwork.java | 51 ++++++++++++------- .../raft/replication/RaftReplicatorTest.java | 10 +++- .../shipping/RaftLogShipperTest.java | 19 ++++--- .../org/neo4j/test/matchers/Matchers.java | 17 ++++--- 14 files changed, 180 insertions(+), 84 deletions(-) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/CoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/CoreClient.java index 21ea0f86774ae..b55dc47e40954 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/CoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/CoreClient.java @@ -23,6 +23,7 @@ import io.netty.channel.socket.SocketChannel; import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import org.neo4j.coreedge.catchup.RequestMessageType; @@ -47,6 +48,8 @@ import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; +import static java.util.Arrays.asList; + public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver, StoreFileStreamingCompleteListener, TxStreamCompleteListener, TxPullResponseListener, CoreSnapshotListener @@ -92,7 +95,7 @@ public void pollForTransactions( AdvertisedSocketAddress serverAddress, long las private void send( AdvertisedSocketAddress to, RequestMessageType messageType, Message contentMessage ) { - senderService.send( to, messageType, contentMessage ); + senderService.send( to, asList( messageType, contentMessage ) ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java index 172d66033fc39..c7ddf2c202edc 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Objects; -import com.sun.org.apache.xml.internal.security.algorithms.MessageDigestAlgorithm; - import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.replication.ReplicatedContent; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingOutbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingOutbound.java index 3bd59f49423b3..f1148b45203f3 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingOutbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingOutbound.java @@ -19,6 +19,8 @@ */ package org.neo4j.coreedge.raft.net; +import java.util.Collection; + import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.server.logging.MessageLogger; @@ -37,7 +39,14 @@ public LoggingOutbound( Outbound outbound, MEMBER me, MessageLog } @Override - public void send( MEMBER to, MESSAGE... messages ) + public void send( MEMBER to, MESSAGE message ) + { + messageLogger.log( me, to, message ); + outbound.send( to, message ); + } + + @Override + public void send( MEMBER to, Collection messages ) { messageLogger.log( me, to, messages ); outbound.send( to, messages ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Outbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Outbound.java index 58842b7f23f50..9497875dccd9f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Outbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Outbound.java @@ -19,6 +19,8 @@ */ package org.neo4j.coreedge.raft.net; +import java.util.Collection; + import org.neo4j.coreedge.network.Message; /** @@ -29,10 +31,17 @@ */ public interface Outbound { + /** + * Asynchronous, best effort delivery to destination. + * @param to destination + * @param message The message to send + */ + void send( MEMBER to, MESSAGE message ); + /** * Asynchronous, best effort delivery to destination. * @param to destination * @param messages The messages to send */ - void send( MEMBER to, MESSAGE... messages ); + void send( MEMBER to, Collection messages ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java index 061a6752b73e2..773b33be80e29 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java @@ -19,17 +19,17 @@ */ package org.neo4j.coreedge.raft.net; -import java.util.Arrays; +import java.util.Collection; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.network.Message; - -import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages.RaftMessage; import org.neo4j.coreedge.raft.RaftMessages.StoreIdAwareMessage; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreMember; +import static java.util.stream.Collectors.toList; + public class RaftOutbound implements Outbound> { private final Outbound outbound; @@ -42,12 +42,19 @@ public RaftOutbound( Outbound outbound, LocalDa } @Override - public void send( CoreMember to, RaftMessage... messages ) + public void send( CoreMember to, RaftMessage message ) + { + outbound.send( to.getRaftAddress(), storeIdify( message ) ); + } + + @Override + public void send( CoreMember to, Collection> raftMessages ) + { + outbound.send( to.getRaftAddress(), raftMessages.stream().map( this::storeIdify ).collect( toList() ) ); + } + + private StoreIdAwareMessage storeIdify( RaftMessage m ) { - @SuppressWarnings("unchecked") - StoreIdAwareMessage[] storeIdAwareMessages = Arrays.stream( messages ). - map( m -> new StoreIdAwareMessage<>( localDatabase.storeId(), m ) ). - toArray( StoreIdAwareMessage[]::new ); - outbound.send( to.getRaftAddress(), storeIdAwareMessages ); + return new StoreIdAwareMessage<>( localDatabase.storeId(), m ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java index 1833ef315f87c..6b8a49da7d94b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java @@ -135,6 +135,4 @@ private void writeMember( CoreMember member, WritableChannel buffer ) throws IOE marshal.marshal( member.getCoreAddress(), buffer ); marshal.marshal( member.getRaftAddress(), buffer ); } - - } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java index 2f80307f0731c..fe2225d2cd49f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java @@ -19,6 +19,7 @@ */ package org.neo4j.coreedge.server; +import java.util.Collection; import java.util.Iterator; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -71,7 +72,7 @@ public SenderService( ChannelInitializer channelInitializer, } @Override - public void send( AdvertisedSocketAddress to, Message... messages ) + public void send( AdvertisedSocketAddress to, Message message ) { serviceLock.readLock().lock(); try @@ -81,13 +82,7 @@ public void send( AdvertisedSocketAddress to, Message... messages ) return; } - MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, NonBlockingChannel.class ); - NonBlockingChannel nonBlockingChannel = getAndUpdateLife( to, monitor ); - monitor.register( to.socketAddress() ); - for ( Object msg : messages ) - { - nonBlockingChannel.send( msg ); - } + channel( to ).send( message ); } finally { @@ -95,13 +90,29 @@ public void send( AdvertisedSocketAddress to, Message... messages ) } } - public int activeChannelCount() + @Override + public void send( AdvertisedSocketAddress to, Collection messages ) { - return nonBlockingChannels.size(); + serviceLock.readLock().lock(); + try + { + if ( !senderServiceRunning ) + { + return; + } + + NonBlockingChannel channel = channel( to ); + messages.forEach( channel::send ); + } + finally + { + serviceLock.readLock().unlock(); + } } - private NonBlockingChannel getAndUpdateLife( AdvertisedSocketAddress to, MessageQueueMonitor monitor ) + private NonBlockingChannel channel( AdvertisedSocketAddress to ) { + MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, NonBlockingChannel.class ); NonBlockingChannel nonBlockingChannel = nonBlockingChannels.get( to ); if ( nonBlockingChannel == null ) @@ -116,6 +127,7 @@ private NonBlockingChannel getAndUpdateLife( AdvertisedSocketAddress to, Message } } + monitor.register( to.socketAddress() ); return nonBlockingChannel; } @@ -175,5 +187,4 @@ public synchronized void stop() serviceLock.writeLock().unlock(); } } - } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/DirectNetworking.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/DirectNetworking.java index 6b39e606989bd..e1cfb5a8e9642 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/DirectNetworking.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/DirectNetworking.java @@ -20,6 +20,8 @@ package org.neo4j.coreedge.raft; import org.neo4j.coreedge.network.Message; + +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -75,7 +77,8 @@ public void reconnect( long id ) disconnectedMembers.remove( id ); } - public class Outbound implements org.neo4j.coreedge.raft.net.Outbound> + public class Outbound implements + org.neo4j.coreedge.raft.net.Outbound> { private final long me; @@ -85,20 +88,30 @@ public Outbound( long me ) } @Override - public synchronized void send( RaftTestMember to, final RaftMessages.RaftMessage... messages ) + public void send( RaftTestMember to, RaftMessages.RaftMessage message ) { - if ( !messageQueues.containsKey( to.getId() ) || - disconnectedMembers.contains( to.getId() ) || - disconnectedMembers.contains( me ) ) + + if ( canDeliver( to ) ) { - return; + messageQueues.get( to.getId() ).add( message ); } + } - for ( Message message : messages ) + @Override + public void send( RaftTestMember to, Collection> messages ) + { + if ( canDeliver( to ) ) { - messageQueues.get( to.getId() ).add( message ); + messageQueues.get( to.getId() ).addAll( messages ); } } + + private boolean canDeliver( RaftTestMember to ) + { + return messageQueues.containsKey( to.getId() ) && + !disconnectedMembers.contains( to.getId() ) && + !disconnectedMembers.contains( me ); + } } public class Inbound implements org.neo4j.coreedge.raft.net.Inbound diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/OutboundMessageCollector.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/OutboundMessageCollector.java index 40d8c89452552..312102838a010 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/OutboundMessageCollector.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/OutboundMessageCollector.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -33,7 +34,7 @@ public class OutboundMessageCollector implements Outbound> { - Map> sentMessages = new HashMap<>(); + Map>> sentMessages = new HashMap<>(); public void clear() { @@ -41,21 +42,31 @@ public void clear() } @Override - public void send( RaftTestMember to, RaftMessages.RaftMessage... messages ) + public void send( RaftTestMember to, RaftMessages.RaftMessage message ) { - List messagesToMember = sentMessages.get( to ); + raftMessages( to ).add( message ); + } + + @Override + public void send( RaftTestMember to, Collection> messages ) + { + raftMessages( to ).addAll( messages ); + } + + private List> raftMessages( RaftTestMember to ) + { + List> messagesToMember = sentMessages.get( to ); if ( messagesToMember == null ) { messagesToMember = new ArrayList<>(); sentMessages.put( to, messagesToMember ); } - - Collections.addAll( messagesToMember, messages ); + return messagesToMember; } - public List sentTo( RaftTestMember member ) + public List> sentTo( RaftTestMember member ) { - List messages = sentMessages.get( member ); + List> messages = sentMessages.get( member ); if ( messages == null ) { @@ -67,7 +78,7 @@ public List sentTo( RaftTestMember member ) public boolean hasAnyEntriesTo( RaftTestMember member ) { - List messages = sentMessages.get( member ); + List> messages = sentMessages.get( member ); return messages != null && messages.size() != 0; } @@ -79,10 +90,7 @@ public boolean hasEntriesTo( RaftTestMember member, RaftLogEntry... expectedMess { if ( message instanceof RaftMessages.AppendEntries.Request ) { - for ( RaftLogEntry actualEntry : ((RaftMessages.AppendEntries.Request) message).entries() ) - { - actualMessages.add( actualEntry ); - } + Collections.addAll( actualMessages, ((RaftMessages.AppendEntries.Request) message).entries() ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java index 2933aa092a212..99b505b0df5b0 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java @@ -20,6 +20,7 @@ package org.neo4j.coreedge.raft; import java.time.Clock; +import java.util.Collection; import java.util.function.Supplier; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; @@ -61,7 +62,19 @@ public class RaftInstanceBuilder NullLogProvider.getInstance() ); private Inbound> inbound = handler -> {}; - private Outbound> outbound = ( advertisedSocketAddress, messages ) -> {}; + private Outbound> outbound = + new Outbound>() + { + @Override + public void send( MEMBER to, RaftMessages.RaftMessage message ) + { + } + + @Override + public void send( MEMBER to, Collection> raftMessages ) + { + } + }; private LogProvider logProvider = NullLogProvider.getInstance(); private Clock clock = Clock.systemUTC(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestNetwork.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestNetwork.java index 26c8cb552162f..a5b34e98cac38 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestNetwork.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestNetwork.java @@ -19,6 +19,7 @@ */ package org.neo4j.coreedge.raft; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -40,9 +41,9 @@ public class RaftTestNetwork private final Map outboundChannels = new HashMap<>(); private final AtomicLong seqGen = new AtomicLong(); - private final BiFunction latencySpecMillis; + private final BiFunction latencySpecMillis; - public RaftTestNetwork( BiFunction latencySpecMillis ) + public RaftTestNetwork( BiFunction latencySpecMillis ) { this.latencySpecMillis = latencySpecMillis; } @@ -149,14 +150,22 @@ public void stop() throws InterruptedException } @Override - public void send( T destination, final Message... messages ) + public void send( T destination, Message message ) + { + doSend( destination, message, System.currentTimeMillis() ); + } + + @Override + public void send( T destination, Collection messages ) { long now = System.currentTimeMillis(); - for ( Message message : messages ) - { - long atMillis = now + latencySpecMillis.apply( me, destination ); - networkThread.scheduleDelivery( destination, message, atMillis ); - } + messages.forEach( message -> doSend( destination, message, now ) ); + } + + private void doSend( T destination, Message message, long now ) + { + long atMillis = now + latencySpecMillis.apply( me, destination ); + networkThread.scheduleDelivery( destination, message, atMillis ); } public void disconnect() @@ -209,9 +218,13 @@ private MessageContext( T destination, Message message, long atMillis ) public boolean equals( Object o ) { if ( this == o ) - { return true; } + { + return true; + } if ( o == null || getClass() != o.getClass() ) - { return false; } + { + return false; + } MessageContext that = (MessageContext) o; return seqNum == that.seqNum; } @@ -225,7 +238,7 @@ public int hashCode() public synchronized void scheduleDelivery( T destination, Message message, long atMillis ) { - if( !disconnected ) + if ( !disconnected ) { msgQueue.add( new MessageContext( destination, message, atMillis ) ); notifyAll(); @@ -235,18 +248,18 @@ public synchronized void scheduleDelivery( T destination, Message message, long @Override public synchronized void run() { - while( !done ) + while ( !done ) { long now = System.currentTimeMillis(); /* Process message ready for delivery */ Iterator itr = msgQueue.iterator(); MessageContext context; - while( itr.hasNext() && ( context = itr.next() ).atMillis <= now ) + while ( itr.hasNext() && (context = itr.next()).atMillis <= now ) { itr.remove(); Inbound inbound = inboundChannels.get( context.destination ); - if( inbound != null ) + if ( inbound != null ) { inbound.deliver( context.message ); } @@ -259,12 +272,12 @@ public synchronized void run() { MessageContext first = msgQueue.first(); long waitTime = first.atMillis - System.currentTimeMillis(); - if( waitTime > 0 ) + if ( waitTime > 0 ) { wait( waitTime ); } } - catch( NoSuchElementException e ) + catch ( NoSuchElementException e ) { wait( 1000 ); } @@ -304,7 +317,7 @@ public void stop() throws InterruptedException public synchronized void deliver( Message message ) { - if( !disconnected ) + if ( !disconnected ) { Q.add( message ); } @@ -340,12 +353,12 @@ public void kill() throws InterruptedException @Override public void run() { - while( !done ) + while ( !done ) { try { Message message = Q.poll( 1, TimeUnit.SECONDS ); - if( message != null && handler != null ) + if ( message != null && handler != null ) { handler.handle( message ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/RaftReplicatorTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/RaftReplicatorTest.java index bd663cb31b95a..711b47763e007 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/RaftReplicatorTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/RaftReplicatorTest.java @@ -21,6 +21,7 @@ import org.junit.Test; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -218,10 +219,17 @@ private class CapturingOutbound implements Outbound messages ) + { + this.lastTo = to; + this.count+=messages.size(); + } } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java index 4d22766eb6a68..452c7de74b94b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java @@ -23,6 +23,7 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.After; @@ -47,6 +48,10 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import org.neo4j.test.DoubleLatch; +import org.neo4j.test.matchers.Matchers; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -130,7 +135,7 @@ public void shouldSendLastEntryOnStart() throws Throwable startLogShipper(); // then - assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry1 ) ); + assertThat( outbound.sentTo( follower ), Matchers.hasRaftLogEntries( singletonList( entry1 ) ) ); } @Test @@ -146,7 +151,7 @@ public void shouldSendPreviousEntryOnMismatch() throws Throwable logShipper.onMismatch( 0, new LeaderContext( 0, 0 ) ); // then - assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry0 ) ); + assertThat( outbound.sentTo( follower ), Matchers.hasRaftLogEntries( singletonList( entry0 ) ) ); } @Test @@ -166,7 +171,7 @@ public void shouldKeepSendingFirstEntryAfterSeveralMismatches() throws Throwable // then assertTrue( outbound.hasEntriesTo( follower, entry0 ) ); - assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry0 ) ); + assertThat( outbound.sentTo( follower ), Matchers.hasRaftLogEntries( singletonList( entry0 ) ) ); } @Test @@ -186,7 +191,7 @@ public void shouldSendNextBatchAfterMatch() throws Throwable logShipper.onMatch( 0, new LeaderContext( 0, 0 ) ); // then - assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry1, entry2, entry3 ) ); + assertThat( outbound.sentTo( follower ), Matchers.hasRaftLogEntries( asList( entry1, entry2, entry3 ) ) ); } @Test @@ -207,7 +212,7 @@ public void shouldSendNewEntriesAfterMatchingLastEntry() throws Throwable logShipper.onNewEntries( 1, 0, new RaftLogEntry[]{entry2}, new LeaderContext( 0, 0 ) ); // then - assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry1, entry2 ) ); + assertThat( outbound.sentTo( follower ), Matchers.hasRaftLogEntries( asList( entry1, entry2 ) ) ); } @Test @@ -244,7 +249,7 @@ public void shouldResendLastSentEntryOnFirstMismatch() throws Throwable logShipper.onMismatch( 1, new LeaderContext( 0, 0 ) ); // then - assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry2 ) ); + assertThat( outbound.sentTo( follower ), Matchers.hasRaftLogEntries( singletonList( entry2 ) ) ); } @Test @@ -307,7 +312,7 @@ public void shouldSendMostRecentlyAvailableEntryIfPruningHappened() throws IOExc //then assertTrue( outbound.hasAnyEntriesTo( follower ) ); - assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry3 ) ); + assertThat( outbound.sentTo( follower ), Matchers.hasRaftLogEntries( singletonList( entry3 ) ) ); } @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/test/matchers/Matchers.java b/enterprise/core-edge/src/test/java/org/neo4j/test/matchers/Matchers.java index 65ae42a954f91..3e2d96a8cf6f2 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/test/matchers/Matchers.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/test/matchers/Matchers.java @@ -20,6 +20,7 @@ package org.neo4j.test.matchers; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @@ -38,12 +39,12 @@ private Matchers() { } - public static Matcher> hasMessage( RaftMessages.BaseMessage message ) + public static Matcher>> hasMessage( RaftMessages.BaseMessage message ) { - return new TypeSafeMatcher>() + return new TypeSafeMatcher>>() { @Override - protected boolean matchesSafely( List messages ) + protected boolean matchesSafely( List> messages ) { return messages.contains( message ); } @@ -56,12 +57,12 @@ public void describeTo( Description description ) }; } - public static Matcher> hasRaftLogEntries( RaftLogEntry... expectedEntries ) + public static Matcher>> hasRaftLogEntries( Collection expectedEntries ) { - return new TypeSafeMatcher>() + return new TypeSafeMatcher>>() { @Override - protected boolean matchesSafely( List messages ) + protected boolean matchesSafely( List> messages ) { List entries = messages.stream() .filter( message -> message instanceof RaftMessages.AppendEntries.Request ) @@ -69,14 +70,14 @@ protected boolean matchesSafely( List messages ) .flatMap( x -> Arrays.stream( x.entries() ) ) .collect( Collectors.toList() ); - return entries.containsAll( Arrays.asList( expectedEntries ) ); + return entries.containsAll( expectedEntries ); } @Override public void describeTo( Description description ) { - description.appendText( "log entries " + Arrays.toString( expectedEntries ) ); + description.appendText( "log entries " + expectedEntries ); } }; }