diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/NetworkFlushableByteBuf.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/NetworkFlushableByteBuf.java index 1a90fa09a0e2a..632b1206841f1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/NetworkFlushableByteBuf.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/NetworkFlushableByteBuf.java @@ -94,4 +94,9 @@ public Flushable prepareForFlush() { return null; } + + public ByteBuf buffer() + { + return delegate; + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java index c1dbc5cd80765..efb95f3ab2c2c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java @@ -27,6 +27,7 @@ import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages.RaftMessage; import org.neo4j.coreedge.raft.net.Inbound.MessageHandler; +import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -36,17 +37,15 @@ public class BatchingMessageHandler implements Runnable, MessageHandler< { private final Log log; private final MessageHandler> innerHandler; - private final LocalDatabase localDatabase; private final BlockingQueue> messageQueue; private final int maxBatch; private final List> batch; public BatchingMessageHandler( MessageHandler> innerHandler, LogProvider logProvider, - int queueSize, int maxBatch, LocalDatabase localDatabase ) + int queueSize, int maxBatch ) { this.innerHandler = innerHandler; - this.localDatabase = localDatabase; this.log = logProvider.getLog( getClass() ); this.maxBatch = maxBatch; @@ -55,9 +54,9 @@ public BatchingMessageHandler( MessageHandler> innerHandler, } @Override - public boolean validate( RaftMessage message ) + public boolean validate( RaftMessage message, StoreId storeId ) { - return innerHandler.validate( message ); + return innerHandler.validate( message, storeId ); } @Override @@ -115,7 +114,7 @@ private void collateAndHandleBatch( List> batch ) if ( batchRequest == null ) { - batchRequest = new RaftMessages.NewEntry.Batch<>( batch.size(), localDatabase.storeId() ); + batchRequest = new RaftMessages.NewEntry.Batch<>( batch.size() ); } batchRequest.add( newEntryRequest.content() ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index a8c78236a5bcd..762cfb64d31a1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -51,6 +51,7 @@ import org.neo4j.coreedge.server.core.NotMyselfSelectionStrategy; import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; import org.neo4j.kernel.impl.store.MismatchingStoreIdException; +import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.impl.store.kvstore.Rotation; import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.internal.DatabaseHealth; @@ -309,12 +310,11 @@ else if ( oldLeader != null && !oldLeader.equals( outcome.getLeader() ) ) } @Override - public boolean validate( RaftMessages.RaftMessage incomingMessage ) + public boolean validate( RaftMessages.RaftMessage incomingMessage, StoreId storeId ) { try { - Outcome outcome = currentRole.handler.validate( incomingMessage, state, log, localDatabase ); - + Outcome outcome = currentRole.handler.validate( incomingMessage, storeId, state, localDatabase ); boolean processable = outcome.isProcessable(); if ( !processable ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java index 01d05677139aa..fa8c5ec840b7d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java @@ -25,6 +25,7 @@ import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.RaftState; import org.neo4j.coreedge.raft.state.ReadableRaftState; +import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.logging.Log; public interface RaftMessageHandler @@ -32,6 +33,6 @@ public interface RaftMessageHandler Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState context, Log log, LocalDatabase localDatabase ) throws IOException; - Outcome validate( RaftMessages.RaftMessage message, RaftState context, - Log log, LocalDatabase localDatabase ); + Outcome validate( RaftMessages.RaftMessage message, StoreId storeId, + RaftState context, LocalDatabase localDatabase ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java index bb73d5a08da65..172d66033fc39 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Objects; +import com.sun.org.apache.xml.internal.security.algorithms.MessageDigestAlgorithm; + import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.replication.ReplicatedContent; @@ -60,7 +62,6 @@ interface RaftMessage extends Message { MEMBER from(); Type type(); - StoreId storeId(); } class Directed @@ -100,10 +101,9 @@ class Request extends BaseMessage private long lastLogIndex; private long lastLogTerm; - public Request( MEMBER from, long term, MEMBER candidate, long lastLogIndex, long lastLogTerm, - StoreId storeId) + public Request( MEMBER from, long term, MEMBER candidate, long lastLogIndex, long lastLogTerm ) { - super( from, Type.VOTE_REQUEST, storeId ); + super( from, Type.VOTE_REQUEST ); this.term = term; this.candidate = candidate; this.lastLogIndex = lastLogIndex; @@ -171,9 +171,9 @@ class Response extends BaseMessage private long term; private boolean voteGranted; - public Response( MEMBER from, long term, boolean voteGranted, StoreId storeId ) + public Response( MEMBER from, long term, boolean voteGranted ) { - super( from, Type.VOTE_RESPONSE, storeId ); + super( from, Type.VOTE_RESPONSE ); this.term = term; this.voteGranted = voteGranted; } @@ -233,9 +233,9 @@ class Request extends BaseMessage private long leaderCommit; public Request( MEMBER from, long leaderTerm, long prevLogIndex, long prevLogTerm, - RaftLogEntry[] entries, long leaderCommit, StoreId storeId ) + RaftLogEntry[] entries, long leaderCommit ) { - super( from, Type.APPEND_ENTRIES_REQUEST, storeId ); + super( from, Type.APPEND_ENTRIES_REQUEST ); Objects.requireNonNull( entries ); assert !((prevLogIndex == -1 && prevLogTerm != -1) || (prevLogTerm == -1 && prevLogIndex != -1)) : format( "prevLogIndex was %d and prevLogTerm was %d", prevLogIndex, prevLogTerm ); @@ -312,10 +312,9 @@ class Response extends BaseMessage private long matchIndex; private long appendIndex; - public Response( MEMBER from, long term, boolean success, long matchIndex, long appendIndex, - StoreId storeId ) + public Response( MEMBER from, long term, boolean success, long matchIndex, long appendIndex ) { - super( from, Type.APPEND_ENTRIES_RESPONSE, storeId ); + super( from, Type.APPEND_ENTRIES_RESPONSE ); this.term = term; this.success = success; this.matchIndex = matchIndex; @@ -373,8 +372,8 @@ public int hashCode() @Override public String toString() { - return format( "AppendEntries.Response from %s {term=%d, storeId=%s, success=%s, matchIndex=%d, appendIndex=%d}", - from, term, storeId(), success, matchIndex, appendIndex ); + return format( "AppendEntries.Response from %s {term=%d, success=%s, matchIndex=%d, appendIndex=%d}", + from, term, success, matchIndex, appendIndex ); } } } @@ -385,9 +384,9 @@ class Heartbeat extends BaseMessage private long commitIndex; private long commitIndexTerm; - public Heartbeat( MEMBER from, long leaderTerm, long commitIndex, long commitIndexTerm, StoreId storeId ) + public Heartbeat( MEMBER from, long leaderTerm, long commitIndex, long commitIndexTerm ) { - super( from, Type.HEARTBEAT, storeId ); + super( from, Type.HEARTBEAT ); this.leaderTerm = leaderTerm; this.commitIndex = commitIndex; this.commitIndexTerm = commitIndexTerm; @@ -454,9 +453,9 @@ class LogCompactionInfo extends BaseMessage private long leaderTerm; private long prevIndex; - public LogCompactionInfo( MEMBER from, long leaderTerm, long prevIndex, StoreId storeId ) + public LogCompactionInfo( MEMBER from, long leaderTerm, long prevIndex ) { - super( from, Type.LOG_COMPACTION_INFO, storeId ); + super( from, Type.LOG_COMPACTION_INFO ); this.leaderTerm = leaderTerm; this.prevIndex = prevIndex; } @@ -515,7 +514,7 @@ class Election extends BaseMessage { public Election( MEMBER from ) { - super( from, Type.ELECTION_TIMEOUT, null ); + super( from, Type.ELECTION_TIMEOUT ); } @Override @@ -529,7 +528,7 @@ class Heartbeat extends BaseMessage { public Heartbeat( MEMBER from ) { - super( from, Type.HEARTBEAT_TIMEOUT, null ); + super( from, Type.HEARTBEAT_TIMEOUT ); } @Override @@ -546,9 +545,9 @@ class Request extends BaseMessage { private ReplicatedContent content; - public Request( MEMBER from, ReplicatedContent content, StoreId storeId ) + public Request( MEMBER from, ReplicatedContent content ) { - super( from, Type.NEW_ENTRY_REQUEST, storeId ); + super( from, Type.NEW_ENTRY_REQUEST ); this.content = content; } @@ -591,9 +590,9 @@ class Batch extends BaseMessage { private List list; - public Batch( int batchSize, StoreId storeId ) + public Batch( int batchSize ) { - super( null, Type.NEW_BATCH_REQUEST, storeId ); + super( null, Type.NEW_BATCH_REQUEST ); list = new ArrayList<>( batchSize ); } @@ -636,17 +635,61 @@ public List contents() } } + class StoreIdAwareMessage implements Message + { + private final StoreId storeId; + private final RaftMessage message; + + public StoreIdAwareMessage( StoreId storeId, RaftMessage message ) + { + Objects.requireNonNull( message ); + this.storeId = storeId; + this.message = message; + } + + public StoreId storeId() + { + return storeId; + } + + public RaftMessage message() + { + return message; + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + StoreIdAwareMessage that = (StoreIdAwareMessage) o; + return Objects.equals( message, that.message ) && + (storeId == that.storeId || (storeId != null && storeId.theRealEquals( that.storeId ))); + } + + @Override + public int hashCode() + { + int result = 31 + (storeId == null ? 0 : storeId.theRealHashCode()); + return 31 * result + Objects.hash( message ); + } + } + abstract class BaseMessage implements RaftMessage { protected MEMBER from; private Type type; - private StoreId storeId; - public BaseMessage( MEMBER from, Type type, StoreId storeId ) + public BaseMessage( MEMBER from, Type type ) { this.from = from; this.type = type; - this.storeId = storeId; } @Override @@ -661,12 +704,6 @@ public Type type() return type; } - @Override - public StoreId storeId() - { - return storeId; - } - @Override public boolean equals( Object o ) { @@ -679,16 +716,13 @@ public boolean equals( Object o ) return false; } BaseMessage that = (BaseMessage) o; - return Objects.equals( from, that.from ) && - type == that.type && - ((storeId == that.storeId) || (storeId != null && storeId.theRealEquals( that.storeId ))); + return Objects.equals( from, that.from ) && type == that.type; } @Override public int hashCode() { - int result = storeId == null ? 0 : storeId.theRealHashCode(); - return 31 * result + Objects.hash( from, type ); + return Objects.hash( from, type ); } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java index ca2bf23e70bda..77e5289e809b8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java @@ -42,6 +42,7 @@ import org.neo4j.coreedge.server.ListenSocketAddress; import org.neo4j.coreedge.server.logging.ExceptionLoggingHandler; import org.neo4j.helpers.NamedThreadFactory; +import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -123,13 +124,15 @@ public void registerHandler( Inbound.MessageHandler> + private class RaftMessageHandler extends SimpleChannelInboundHandler> { @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, - RaftMessages.RaftMessage message ) throws Exception + RaftMessages.StoreIdAwareMessage storeIdAwareMessage ) throws Exception { - if ( messageHandler.validate( message ) ) + RaftMessages.RaftMessage message = storeIdAwareMessage.message(); + StoreId storeId = storeIdAwareMessage.storeId(); + if ( messageHandler.validate( message, storeId ) ) { messageHandler.handle( message ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java index f047d97f7ec1c..e746bbda79a94 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java @@ -20,6 +20,7 @@ package org.neo4j.coreedge.raft.net; import org.neo4j.coreedge.network.Message; +import org.neo4j.kernel.impl.store.StoreId; public interface Inbound { @@ -27,7 +28,7 @@ public interface Inbound interface MessageHandler { - boolean validate( M message ); + boolean validate( M message, StoreId storeId ); void handle( M message ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingInbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingInbound.java index d5030f26a6fa3..ee6834bc804c3 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingInbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingInbound.java @@ -23,6 +23,7 @@ import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.logging.MessageLogger; +import org.neo4j.kernel.impl.store.StoreId; public class LoggingInbound implements Inbound { @@ -44,9 +45,9 @@ public void registerHandler( final MessageHandler handler ) inbound.registerHandler( new MessageHandler() { @Override - public boolean validate( M message ) + public boolean validate( M message, StoreId storeId ) { - return handler.validate( message ); + return handler.validate( message, storeId ); } public synchronized void handle( M message ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java index 6c2a8b92e620f..72d40cd1c9870 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/RaftOutbound.java @@ -19,23 +19,35 @@ */ package org.neo4j.coreedge.raft.net; +import java.util.Arrays; + +import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.network.Message; +import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.raft.RaftMessages.RaftMessage; +import org.neo4j.coreedge.raft.RaftMessages.StoreIdAwareMessage; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreMember; public class RaftOutbound implements Outbound { private final Outbound outbound; + private final LocalDatabase localDatabase; - public RaftOutbound( Outbound outbound ) + public RaftOutbound( Outbound outbound, LocalDatabase localDatabase ) { this.outbound = outbound; + this.localDatabase = localDatabase; } @Override public void send( CoreMember to, Message... messages ) { - outbound.send( to.getRaftAddress(), messages ); + @SuppressWarnings("unchecked") + StoreIdAwareMessage[] storeIdAwareMessages = Arrays.stream( messages ). + map( m -> new StoreIdAwareMessage<>( localDatabase.storeId(), (RaftMessage) m ) ). + toArray( StoreIdAwareMessage[]::new ); + outbound.send( to.getRaftAddress(), storeIdAwareMessages ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java index 972560dc4985c..43d7d941f0a58 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java @@ -19,13 +19,13 @@ */ package org.neo4j.coreedge.raft.net.codecs; +import java.io.IOException; +import java.util.List; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; -import java.io.IOException; -import java.util.List; - import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.net.NetworkReadableClosableChannelNetty4; @@ -58,13 +58,14 @@ public RaftMessageDecoder( ChannelMarshal marshal ) protected void decode( ChannelHandlerContext ctx, ByteBuf buffer, List list ) throws Exception { ReadableChannel channel = new NetworkReadableClosableChannelNetty4( buffer ); - int messageTypeWire = channel.getInt(); + StoreId storeId = StoreIdMarshal.unmarshal( channel ); + int messageTypeWire = channel.getInt(); RaftMessages.Type[] values = RaftMessages.Type.values(); RaftMessages.Type messageType = values[messageTypeWire]; CoreMember from = retrieveMember( channel ); - StoreId storeId = StoreIdMarshal.unmarshal( channel ); + RaftMessages.RaftMessage result; if ( messageType.equals( VOTE_REQUEST ) ) { @@ -74,18 +75,15 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf buffer, List l long lastLogIndex = channel.getLong(); long lastLogTerm = channel.getLong(); - RaftMessages.Vote.Request request = new RaftMessages.Vote.Request<>( - from, term, candidate, lastLogIndex, lastLogTerm, storeId ); - list.add( request ); + result = new RaftMessages.Vote.Request<>( + from, term, candidate, lastLogIndex, lastLogTerm ); } else if ( messageType.equals( VOTE_RESPONSE ) ) { long term = channel.getLong(); boolean voteGranted = channel.get() == 1; - RaftMessages.Vote.Response response = new RaftMessages.Vote.Response<>( from, term, - voteGranted, storeId ); - list.add( response ); + result = new RaftMessages.Vote.Response<>( from, term, voteGranted ); } else if ( messageType.equals( APPEND_ENTRIES_REQUEST ) ) { @@ -105,8 +103,8 @@ else if ( messageType.equals( APPEND_ENTRIES_REQUEST ) ) entries[i] = new RaftLogEntry( entryTerm, content ); } - list.add( new RaftMessages.AppendEntries.Request<>( from, term, prevLogIndex, prevLogTerm, - entries, leaderCommit, storeId ) ); + result = new RaftMessages.AppendEntries.Request<>( from, term, prevLogIndex, prevLogTerm, entries, + leaderCommit ); } else if ( messageType.equals( APPEND_ENTRIES_RESPONSE ) ) { @@ -115,14 +113,13 @@ else if ( messageType.equals( APPEND_ENTRIES_RESPONSE ) ) long matchIndex = channel.getLong(); long appendIndex = channel.getLong(); - list.add( new RaftMessages.AppendEntries.Response<>( from, term, success, matchIndex, - appendIndex, storeId ) ); + result = new RaftMessages.AppendEntries.Response<>( from, term, success, matchIndex, appendIndex ); } else if ( messageType.equals( NEW_ENTRY_REQUEST ) ) { ReplicatedContent content = marshal.unmarshal( channel ); - list.add( new RaftMessages.NewEntry.Request<>( from, content, storeId ) ); + result = new RaftMessages.NewEntry.Request<>( from, content ); } else if ( messageType.equals( HEARTBEAT ) ) { @@ -130,19 +127,21 @@ else if ( messageType.equals( HEARTBEAT ) ) long commitIndexTerm = channel.getLong(); long commitIndex = channel.getLong(); - list.add( new RaftMessages.Heartbeat<>( from, leaderTerm, commitIndex, commitIndexTerm, storeId ) ); + result = new RaftMessages.Heartbeat<>( from, leaderTerm, commitIndex, commitIndexTerm ); } else if ( messageType.equals( LOG_COMPACTION_INFO ) ) { long leaderTerm = channel.getLong(); long prevIndex = channel.getLong(); - list.add( new RaftMessages.LogCompactionInfo<>( from, leaderTerm, prevIndex, storeId ) ); + result = new RaftMessages.LogCompactionInfo<>( from, leaderTerm, prevIndex ); } else { throw new IllegalArgumentException( "Unknown message type" ); } + + list.add( new RaftMessages.StoreIdAwareMessage<>( storeId, result ) ); } private CoreMember retrieveMember( ReadableChannel buffer ) throws IOException diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java index 8b13b895f1875..1833ef315f87c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncoder.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; @@ -30,13 +29,14 @@ import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.replication.ReplicatedContent; -import org.neo4j.coreedge.raft.state.ChannelMarshal; import org.neo4j.coreedge.raft.replication.storeid.StoreIdMarshal; +import org.neo4j.coreedge.raft.state.ChannelMarshal; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.storageengine.api.WritableChannel; -public class RaftMessageEncoder extends MessageToMessageEncoder> +public class RaftMessageEncoder extends MessageToMessageEncoder> { private final ChannelMarshal marshal; @@ -46,15 +46,17 @@ public RaftMessageEncoder( ChannelMarshal marshal ) } @Override - protected synchronized void encode( ChannelHandlerContext ctx, RaftMessages.RaftMessage message, + protected synchronized void encode( ChannelHandlerContext ctx, + RaftMessages.StoreIdAwareMessage decoratedMessage, List list ) throws Exception { - ByteBuf buffer = ctx.alloc().buffer(); - WritableChannel channel = new NetworkFlushableByteBuf( buffer ); + RaftMessages.RaftMessage message = decoratedMessage.message(); + StoreId storeId = decoratedMessage.storeId(); + NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( ctx.alloc().buffer() ); + StoreIdMarshal.marshal( storeId, channel ); channel.putInt( message.type().ordinal() ); writeMember( message.from(), channel ); - StoreIdMarshal.marshal( message.storeId(), channel ); if ( message instanceof RaftMessages.Vote.Request ) { @@ -122,7 +124,7 @@ else if( message instanceof RaftMessages.LogCompactionInfo ) throw new IllegalArgumentException( "Unknown message type" ); } - list.add( buffer ); + list.add( channel.buffer() ); } private void writeMember( CoreMember member, WritableChannel buffer ) throws IOException @@ -133,4 +135,6 @@ private void writeMember( CoreMember member, WritableChannel buffer ) throws IOE marshal.marshal( member.getCoreAddress(), buffer ); marshal.marshal( member.getRaftAddress(), buffer ); } + + } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/LeaderOnlyReplicator.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/LeaderOnlyReplicator.java index da42a906eda9c..72926144d8c5c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/LeaderOnlyReplicator.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/LeaderOnlyReplicator.java @@ -21,8 +21,6 @@ import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.net.Outbound; -import org.neo4j.coreedge.server.AdvertisedSocketAddress; -import org.neo4j.coreedge.server.CoreMember; import org.neo4j.kernel.impl.store.StoreId; @@ -39,6 +37,6 @@ public LeaderOnlyReplicator( MEMBER source, Outbound outbound ) public void replicate( ReplicatedContent content, StoreId storeId ) { - outbound.send( source, new RaftMessages.NewEntry.Request<>( source, content, storeId ) ); + outbound.send( source, new RaftMessages.NewEntry.Request<>( source, content ) ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/RaftReplicator.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/RaftReplicator.java index 67b382049dfb7..026a1c749e42c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/RaftReplicator.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/RaftReplicator.java @@ -25,15 +25,11 @@ import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.LeaderLocator; import org.neo4j.coreedge.raft.NoLeaderFoundException; -import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.net.Outbound; -import org.neo4j.coreedge.raft.net.RaftOutbound; import org.neo4j.coreedge.raft.replication.session.LocalSessionPool; import org.neo4j.coreedge.raft.replication.session.OperationContext; -import org.neo4j.coreedge.raft.replication.tx.ExponentialBackoffStrategy; import org.neo4j.coreedge.raft.replication.tx.RetryStrategy; -import org.neo4j.coreedge.server.CoreMember; import org.neo4j.kernel.impl.util.Listener; /** @@ -83,7 +79,7 @@ public Future replicate( ReplicatedContent command, boolean trackResult RetryStrategy.Timeout timeout = retryStrategy.newTimeout(); do { - outbound.send( leader, new RaftMessages.NewEntry.Request<>( me, operation, localDatabase.storeId() ) ); + outbound.send( leader, new RaftMessages.NewEntry.Request<>( me, operation ) ); try { progress.awaitReplication( timeout.getMillis() ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java index 24dff1ff4331c..a2999af4d467c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java @@ -27,7 +27,6 @@ import org.neo4j.coreedge.raft.LeaderContext; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RenewableTimeoutService; -import org.neo4j.coreedge.raft.log.RaftLogCursor; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.log.ReadableRaftLog; import org.neo4j.coreedge.raft.log.segmented.InFlightMap; @@ -97,7 +96,6 @@ enum Mode private final Outbound outbound; private final LogProvider logProvider; - private final LocalDatabase localDatabase; private final Log log; private final ReadableRaftLog raftLog; private final Clock clock; @@ -130,8 +128,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName RaftLogShipper( Outbound outbound, LogProvider logProvider, ReadableRaftLog raftLog, Clock clock, MEMBER leader, MEMBER follower, long leaderTerm, long leaderCommit, long retryTimeMillis, - int catchupBatchSize, int maxAllowedShippingLag, InFlightMap inFlightMap, - LocalDatabase localDatabase ) + int catchupBatchSize, int maxAllowedShippingLag, InFlightMap inFlightMap ) { this.outbound = outbound; this.catchupBatchSize = catchupBatchSize; @@ -145,7 +142,6 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName this.retryTimeMillis = retryTimeMillis; this.lastLeaderContext = new LeaderContext( leaderTerm, leaderCommit ); this.inFlightMap = inFlightMap; - this.localDatabase = localDatabase; } public Object identity() @@ -381,7 +377,7 @@ private void sendCommitUpdate( LeaderContext leaderContext ) */ RaftMessages.Heartbeat appendRequest = new RaftMessages.Heartbeat<>( leader, leaderContext.term, leaderContext.commitIndex, - leaderContext.term, localDatabase.storeId() ); + leaderContext.term ); outbound.send( follower, appendRequest ); } @@ -403,8 +399,8 @@ private void sendNewEntries( long prevLogIndex, long prevLogTerm, RaftLogEntry[] lastSentIndex = prevLogIndex + 1; RaftMessages.AppendEntries.Request appendRequest = new RaftMessages.AppendEntries.Request<>( - leader, leaderContext.term, prevLogIndex, prevLogTerm, newEntries, leaderContext.commitIndex, - localDatabase.storeId() ); + leader, leaderContext.term, prevLogIndex, prevLogTerm, newEntries, leaderContext.commitIndex + ); outbound.send( follower, appendRequest ); } @@ -439,13 +435,13 @@ private void sendRange( long startIndex, long endIndex, LeaderContext leaderCont "Sending a LogCompactionInfo instead. Leader context=%s, prevLogTerm=%d", statusAsString(), leaderContext, prevLogTerm ); outbound.send( follower, new RaftMessages.LogCompactionInfo<>( leader, leaderContext.term, - prevLogIndex, localDatabase.storeId() ) ); + prevLogIndex ) ); return; } RaftMessages.AppendEntries.Request appendRequest = new RaftMessages.AppendEntries.Request<>( leader, leaderContext.term, prevLogIndex, prevLogTerm, - entries, leaderContext.commitIndex, localDatabase.storeId() ); + entries, leaderContext.commitIndex ); try ( InFlightLogEntrySupplier logEntrySupplier = new InFlightLogEntrySupplier( raftLog, inFlightMap ) ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java index c36f01a774a44..4f37268ed0501 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java @@ -116,7 +116,7 @@ private RaftLogShipper ensureLogShipperRunning( MEMBER member, LeaderContext lea { logShipper = new RaftLogShipper<>( outbound, logProvider, raftLog, clock, myself, member, leaderContext.term, leaderContext.commitIndex, retryTimeMillis, catchupBatchSize, - maxAllowedShippingLag, inFlightMap, localDatabase ); + maxAllowedShippingLag, inFlightMap ); logShippers.put( member, logShipper ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java index f510e6c935c42..8053347cb6bd1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java @@ -42,7 +42,7 @@ public static void handleAppendEntriesRequest( if ( request.leaderTerm() < state.term() ) { RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response<>( - state.myself(), state.term(), false, -1, state.entryLog().appendIndex(),localStoreId ); + state.myself(), state.term(), false, -1, state.entryLog().appendIndex() ); outcome.addOutgoingMessage( new RaftMessages.Directed<>( request.from(), appendResponse ) ); return; @@ -57,7 +57,7 @@ public static void handleAppendEntriesRequest( { assert request.prevLogIndex() > -1 && request.prevLogTerm() > -1; RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response<>( - state.myself(), request.leaderTerm(), false, -1, state.entryLog().appendIndex(), localStoreId ); + state.myself(), request.leaderTerm(), false, -1, state.entryLog().appendIndex() ); outcome.addOutgoingMessage( new RaftMessages.Directed<>( request.from(), appendResponse ) ); return; @@ -108,7 +108,7 @@ else if ( logTerm != request.entries()[offset].term() ) if ( endMatchIndex >= 0 ) { RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response<>( - state.myself(), request.leaderTerm(), true, endMatchIndex, endMatchIndex, localStoreId ); + state.myself(), request.leaderTerm(), true, endMatchIndex, endMatchIndex ); outcome.addOutgoingMessage( new RaftMessages.Directed<>( request.from(), appendResponse ) ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java index b93b7bc8ffdbd..44a553115c32c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java @@ -28,6 +28,7 @@ import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.RaftState; import org.neo4j.coreedge.raft.state.ReadableRaftState; +import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.logging.Log; import static org.neo4j.coreedge.raft.MajorityIncludingSelfQuorum.isQuorum; @@ -69,7 +70,7 @@ public Outcome handle( RaftMessages.RaftMessage message { RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response<>( ctx.myself(), ctx.term(), false, - req.prevLogIndex(), ctx.entryLog().appendIndex(), localDatabase.storeId() ); + req.prevLogIndex(), ctx.entryLog().appendIndex() ); outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) ); break; @@ -133,7 +134,7 @@ else if ( res.term() < ctx.term() || !res.voteGranted() ) } outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), - new RaftMessages.Vote.Response<>( ctx.myself(), outcome.getTerm(), false, localDatabase.storeId() ) ) ); + new RaftMessages.Vote.Response<>( ctx.myself(), outcome.getTerm(), false ) ) ); break; } @@ -152,8 +153,8 @@ else if ( res.term() < ctx.term() || !res.voteGranted() ) } @Override - public Outcome validate( RaftMessages.RaftMessage message, RaftState ctx, - Log log, LocalDatabase localDatabase ) + public Outcome validate( RaftMessages.RaftMessage message, StoreId storeId, + RaftState ctx, LocalDatabase localDatabase ) { return new Outcome<>( CANDIDATE, ctx ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Election.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Election.java index 609e46e9ca8e4..b360b7a0b1e6a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Election.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Election.java @@ -45,7 +45,7 @@ public static boolean start( ReadableRaftState ctx, Outcome voteForMe = new RaftMessages.Vote.Request<>( ctx.myself(), outcome.getTerm(), ctx.myself(), ctx.entryLog() - .appendIndex(), ctx.entryLog().readEntryTerm( ctx.entryLog().appendIndex() ), storeId ); + .appendIndex(), ctx.entryLog().readEntryTerm( ctx.entryLog().appendIndex() ) ); currentMembers.stream().filter( member -> !member.equals( ctx.myself() ) ).forEach( member -> outcome.addOutgoingMessage( new RaftMessages.Directed<>( member, voteForMe ) ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java index 3bd0c2815ea60..2d92940f4125d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java @@ -128,16 +128,16 @@ public Outcome handle( RaftMessages.RaftMessage message } @Override - public Outcome validate( RaftMessages.RaftMessage message, RaftState ctx, - Log log, LocalDatabase localDatabase ) + public Outcome validate( RaftMessages.RaftMessage message, StoreId storeId, + RaftState ctx, LocalDatabase localDatabase ) { localDatabase.assertHealthy( IllegalStateException.class ); Outcome outcome = new Outcome<>( FOLLOWER, ctx ); - StoreId storeId = localDatabase.storeId(); + StoreId localStoreId = localDatabase.storeId(); if ( outcome.getLeader() != null && message.type() != RaftMessages.Type.HEARTBEAT_TIMEOUT && - !storeId.theRealEquals( message.storeId() ) ) + !localStoreId.theRealEquals( storeId ) ) { if ( localDatabase.isEmpty() ) { @@ -146,7 +146,7 @@ public Outcome validate( RaftMessages.RaftMessage messa } else if ( message.type() != RaftMessages.Type.VOTE_REQUEST ) { - throw new MismatchingStoreIdException( message.storeId(), storeId ); + throw new MismatchingStoreIdException( storeId, localStoreId ); } else { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java index f048ead05d278..da45f164d46bb 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java @@ -40,7 +40,6 @@ import static java.lang.Math.max; -import static org.neo4j.coreedge.raft.roles.Role.CANDIDATE; import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; import static org.neo4j.coreedge.raft.roles.Role.LEADER; @@ -57,7 +56,7 @@ private static void sendHeartbeats( ReadableRaftState ctx, long commitIndex = ctx.leaderCommit(); long commitIndexTerm = ctx.entryLog().readEntryTerm( commitIndex ); Heartbeat heartbeat = new Heartbeat<>( ctx.myself(), ctx.term(), commitIndex, - commitIndexTerm, storeId ); + commitIndexTerm ); for ( MEMBER to : replicationTargets( ctx ) ) { outcome.addOutgoingMessage( new RaftMessages.Directed<>( to, heartbeat ) ); @@ -103,7 +102,7 @@ public Outcome handle( RaftMessages.RaftMessage message { RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response<>( ctx.myself(), ctx.term(), false, req.prevLogIndex(), - ctx.entryLog().appendIndex(), localDatabase.storeId() ); + ctx.entryLog().appendIndex() ); outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) ); break; @@ -196,8 +195,7 @@ else if ( response.term() > ctx.term() ) // There are no earlier entries, message the follower that we have compacted so that // it can take appropriate action. outcome.addOutgoingMessage( new RaftMessages.Directed<>( response.from(), - new RaftMessages.LogCompactionInfo<>( ctx.myself(), ctx.term(), - ctx.entryLog().prevIndex(), localDatabase.storeId() ) ) ); + new RaftMessages.LogCompactionInfo<>( ctx.myself(), ctx.term(), ctx.entryLog().prevIndex() ) ) ); } } break; @@ -219,7 +217,7 @@ else if ( response.term() > ctx.term() ) } outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), - new RaftMessages.Vote.Response<>( ctx.myself(), ctx.term(), false, localDatabase.storeId() ) ) ); + new RaftMessages.Vote.Response<>( ctx.myself(), ctx.term(), false ) ) ); break; } @@ -244,8 +242,8 @@ else if ( response.term() > ctx.term() ) } @Override - public Outcome validate( RaftMessages.RaftMessage message, RaftState ctx, - Log log, LocalDatabase localDatabase ) + public Outcome validate( RaftMessages.RaftMessage message, StoreId storeId, + RaftState ctx, LocalDatabase localDatabase ) { return new Outcome<>( LEADER, ctx ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java index 9433c5b95a402..c91f5ed2be116 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java @@ -51,7 +51,7 @@ public static void handleVoteRequest( ReadableRaftState state, outcome.addOutgoingMessage( new RaftMessages.Directed<>( voteRequest.from(), new RaftMessages.Vote.Response<>( state.myself(), outcome.getTerm(), - willVoteForCandidate, storeId ) ) ); + willVoteForCandidate ) ) ); } public static boolean shouldVoteFor( MEMBER candidate, long contextTerm, long requestTerm, diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index 2e2976d17f43a..657b023635587 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -380,7 +380,7 @@ fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session } RaftReplicator replicator = new RaftReplicator<>( raft, myself, - new RaftOutbound( loggingOutbound ), sessionPool, progressTracker, + new RaftOutbound( loggingOutbound, localDatabase ), sessionPool, progressTracker, new ExponentialBackoffStrategy( 10, SECONDS ), localDatabase ); dependencies.satisfyDependency( raft ); @@ -681,7 +681,7 @@ fileSystem, new File( clusterStateDirectory, "term-state" ), "term-state", CoreMemberSetBuilder memberSetBuilder = new CoreMemberSetBuilder(); - RaftOutbound raftOutbound = new RaftOutbound( outbound ); + RaftOutbound raftOutbound = new RaftOutbound( outbound, localDatabase ); LeaderOnlyReplicator leaderOnlyReplicator = new LeaderOnlyReplicator<>( myself, raftOutbound ); RaftMembershipManager raftMembershipManager = new RaftMembershipManager<>( leaderOnlyReplicator, @@ -704,7 +704,7 @@ raftTimeoutService, new NotMyselfSelectionStrategy( discoveryService, myself ), int queueSize = config.get( CoreEdgeClusterSettings.raft_in_queue_size ); int maxBatch = config.get( CoreEdgeClusterSettings.raft_in_queue_max_batch ); BatchingMessageHandler batchingMessageHandler = - new BatchingMessageHandler<>( raftInstance, logProvider, queueSize, maxBatch, localDatabase ); + new BatchingMessageHandler<>( raftInstance, logProvider, queueSize, maxBatch ); life.add( new ContinuousJob( jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ), batchingMessageHandler ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesRequestBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesRequestBuilder.java index 06ecef82a080c..153c38a9394c7 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesRequestBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesRequestBuilder.java @@ -38,7 +38,7 @@ public class AppendEntriesRequestBuilder public RaftMessages.AppendEntries.Request build() { return new RaftMessages.AppendEntries.Request<>( from, leaderTerm, prevLogIndex, prevLogTerm, - logEntries.toArray( new RaftLogEntry[logEntries.size()] ), leaderCommit, storeId ); + logEntries.toArray( new RaftLogEntry[logEntries.size()] ), leaderCommit ); } public AppendEntriesRequestBuilder from( MEMBER from ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesResponseBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesResponseBuilder.java index ae0fe02938b41..6d0fee193e444 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesResponseBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesResponseBuilder.java @@ -34,7 +34,7 @@ public RaftMessages.AppendEntries.Response build() { // a response of false should always have a match index of -1 assert !(success == false && matchIndex != -1); - return new RaftMessages.AppendEntries.Response<>( from, term, success, matchIndex, appendIndex, storeId ); + return new RaftMessages.AppendEntries.Response<>( from, term, success, matchIndex, appendIndex ); } public AppendEntriesResponseBuilder from( MEMBER from ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java index d4fe572c8d4ba..f197c79787368 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java @@ -20,7 +20,6 @@ package org.neo4j.coreedge.raft; import org.junit.Test; -import org.mockito.cglib.core.Local; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -49,9 +48,9 @@ public void shouldInvokeInnerHandlerWhenRun() throws Exception Inbound.MessageHandler> innerHandler = mock( Inbound.MessageHandler.class ); BatchingMessageHandler batchHandler = new BatchingMessageHandler<>( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase ); - RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request<>( null, null, - localDatabase.storeId() ); + innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH ); + RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request<>( null, null + ); batchHandler.handle( message ); verifyZeroInteractions( innerHandler ); @@ -71,9 +70,9 @@ public void shouldInvokeHandlerOnQueuedMessage() throws Exception Inbound.MessageHandler> innerHandler = mock( Inbound.MessageHandler.class ); BatchingMessageHandler batchHandler = new BatchingMessageHandler<>( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase ); - RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request<>( null, null, - localDatabase.storeId()); + innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH ); + RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request<>( null, null + ); ExecutorService executor = Executors.newCachedThreadPool(); Future future = executor.submit( batchHandler ); @@ -100,13 +99,13 @@ public void shouldBatchRequests() throws Exception Inbound.MessageHandler> innerHandler = mock( Inbound.MessageHandler.class ); BatchingMessageHandler batchHandler = new BatchingMessageHandler<>( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase ); + innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH ); ReplicatedString contentA = new ReplicatedString( "A" ); ReplicatedString contentB = new ReplicatedString( "B" ); - RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request<>( null, contentA, - localDatabase.storeId() ); - RaftMessages.NewEntry.Request messageB = new RaftMessages.NewEntry.Request<>( null, contentB, - localDatabase.storeId()); + RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request<>( null, contentA + ); + RaftMessages.NewEntry.Request messageB = new RaftMessages.NewEntry.Request<>( null, contentB + ); batchHandler.handle( messageA ); batchHandler.handle( messageB ); @@ -116,7 +115,7 @@ public void shouldBatchRequests() throws Exception batchHandler.run(); // then - RaftMessages.NewEntry.Batch batch = new RaftMessages.NewEntry.Batch<>( 2, localDatabase.storeId() ); + RaftMessages.NewEntry.Batch batch = new RaftMessages.NewEntry.Batch<>( 2 ); batch.add( contentA ); batch.add( contentB ); verify( innerHandler ).handle( batch ); @@ -130,17 +129,17 @@ public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Excep Inbound.MessageHandler> innerHandler = mock( Inbound.MessageHandler.class ); BatchingMessageHandler batchHandler = new BatchingMessageHandler<>( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase ); + innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH ); ReplicatedString contentA = new ReplicatedString( "A" ); ReplicatedString contentC = new ReplicatedString( "C" ); - RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request<>( null, contentA, - localDatabase.storeId()); - RaftMessages.Heartbeat messageB = new RaftMessages.Heartbeat<>( null, 0, 0, 0, - localDatabase.storeId()); - RaftMessages.NewEntry.Request messageC = new RaftMessages.NewEntry.Request<>( null, contentC, - localDatabase.storeId()); - RaftMessages.Heartbeat messageD = new RaftMessages.Heartbeat<>( null, 1, 1, 1, localDatabase.storeId() ); + RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request<>( null, contentA + ); + RaftMessages.Heartbeat messageB = new RaftMessages.Heartbeat<>( null, 0, 0, 0 + ); + RaftMessages.NewEntry.Request messageC = new RaftMessages.NewEntry.Request<>( null, contentC + ); + RaftMessages.Heartbeat messageD = new RaftMessages.Heartbeat<>( null, 1, 1, 1 ); batchHandler.handle( messageA ); batchHandler.handle( messageB ); @@ -152,7 +151,7 @@ public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Excep batchHandler.run(); // then - RaftMessages.NewEntry.Batch batch = new RaftMessages.NewEntry.Batch<>( 2, localDatabase.storeId() ); + RaftMessages.NewEntry.Batch batch = new RaftMessages.NewEntry.Batch<>( 2 ); batch.add( contentA ); batch.add( contentC ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/CatchUpTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/CatchUpTest.java index 48e37eaffcaeb..b79773ab2bb16 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/CatchUpTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/CatchUpTest.java @@ -58,7 +58,7 @@ public void happyClusterPropagatesUpdates() throws Throwable // when fixture.members().withId( leader ).timeoutService().invokeTimeout( RaftInstance.Timeouts.ELECTION ); net.processMessages(); - fixture.members().withId( leader ).raftInstance().handle( new Request<>( leaderMember, valueOf( 42 ), storeId ) ); + fixture.members().withId( leader ).raftInstance().handle( new Request<>( leaderMember, valueOf( 42 ) ) ); net.processMessages(); // then @@ -91,10 +91,10 @@ public void newMemberWithNoLogShouldCatchUpFromPeers() throws Throwable net.disconnect( sleepyId ); // when - fixture.members().withId( leaderId ).raftInstance().handle( new Request<>( leader, valueOf( 10 ), storeId ) ); - fixture.members().withId( leaderId ).raftInstance().handle( new Request<>( leader, valueOf( 20 ), storeId ) ); - fixture.members().withId( leaderId ).raftInstance().handle( new Request<>( leader, valueOf( 30 ), storeId ) ); - fixture.members().withId( leaderId ).raftInstance().handle( new Request<>( leader, valueOf( 40 ), storeId ) ); + fixture.members().withId( leaderId ).raftInstance().handle( new Request<>( leader, valueOf( 10 ) ) ); + fixture.members().withId( leaderId ).raftInstance().handle( new Request<>( leader, valueOf( 20 ) ) ); + fixture.members().withId( leaderId ).raftInstance().handle( new Request<>( leader, valueOf( 30 ) ) ); + fixture.members().withId( leaderId ).raftInstance().handle( new Request<>( leader, valueOf( 40 ) ) ); net.processMessages(); // then diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/HeartbeatBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/HeartbeatBuilder.java index f67ee7b226476..0d3588af23065 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/HeartbeatBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/HeartbeatBuilder.java @@ -31,7 +31,7 @@ public class HeartbeatBuilder public RaftMessages.Heartbeat build() { - return new RaftMessages.Heartbeat<>( from, leaderTerm, commitIndex, commitIndexTerm, storeId ); + return new RaftMessages.Heartbeat<>( from, leaderTerm, commitIndex, commitIndexTerm ); } public HeartbeatBuilder from( MEMBER from ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java index 12e6d0ae0f380..4caea5a876d03 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java @@ -392,7 +392,7 @@ public void newMembersShouldBeIncludedInHeartbeatMessages() throws Exception newMemberInbound.registerHandler( new Inbound.MessageHandler() { @Override - public boolean validate( Message message ) + public boolean validate( Message message, StoreId storeId ) { return true; } @@ -471,7 +471,7 @@ public void shouldPanicWhenFailingToHandleMessageUnderNormalConditions() throws // when raft.handle( new RaftMessages.AppendEntries.Request<>( member1, 0, -1, -1, - new RaftLogEntry[]{new RaftLogEntry( 0, new ReplicatedString( "hello" ) )}, 0, storeId ) ); + new RaftLogEntry[]{new RaftLogEntry( 0, new ReplicatedString( "hello" ) )}, 0 ) ); // then assertTrue( databaseHealth.hasPanicked() ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VoteRequestBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VoteRequestBuilder.java index 0f1af17db7ca5..b1a7b05e593a6 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VoteRequestBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VoteRequestBuilder.java @@ -32,7 +32,7 @@ public class VoteRequestBuilder public RaftMessages.Vote.Request build() { - return new RaftMessages.Vote.Request<>( from, term, candidate, lastLogIndex, lastLogTerm, storeId ); + return new RaftMessages.Vote.Request<>( from, term, candidate, lastLogIndex, lastLogTerm ); } public VoteRequestBuilder from( MEMBER from ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VoteResponseBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VoteResponseBuilder.java index 340a86ab013df..1fb410a7dbc52 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VoteResponseBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VoteResponseBuilder.java @@ -19,18 +19,15 @@ */ package org.neo4j.coreedge.raft; -import org.neo4j.kernel.impl.store.StoreId; - public class VoteResponseBuilder { boolean voteGranted = false; private long term = -1; private MEMBER from = null; - private StoreId storeId = new StoreId( 1, 2, 3, 4, 5 );; public RaftMessages.Vote.Response build() { - return new RaftMessages.Vote.Response<>( from, term, voteGranted, storeId ); + return new RaftMessages.Vote.Response<>( from, term, voteGranted ); } public VoteResponseBuilder from( MEMBER from ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java index 2344cb13a8511..25448afcb2178 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java @@ -36,10 +36,10 @@ import org.neo4j.coreedge.raft.state.ChannelMarshal; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.storageengine.api.ReadPastEndException; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; -import org.neo4j.kernel.impl.store.StoreId; import static org.junit.Assert.assertEquals; @@ -90,7 +90,6 @@ public ReplicatedContent unmarshal( ReadableChannel channel ) throws IOException }; private EmbeddedChannel channel; - private StoreId storeId = new StoreId( 1, 2, 3, 4, 5 ); @Before public void setup() @@ -104,7 +103,7 @@ public void shouldEncodeAndDecodeVoteRequest() // given CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:9000" ), new AdvertisedSocketAddress( "host1:9001" ) ); - RaftMessages.Vote.Request request = new RaftMessages.Vote.Request<>( member, 1, member, 1, 1, storeId ); + RaftMessages.Vote.Request request = new RaftMessages.Vote.Request<>( member, 1, member, 1, 1 ); // when channel.writeOutbound( request ); @@ -120,7 +119,7 @@ public void shouldEncodeAndDecodeVoteResponse() // given CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:9000" ), new AdvertisedSocketAddress( "host1:9001" ) ); - RaftMessages.Vote.Response response = new RaftMessages.Vote.Response<>( member, 1, true, storeId ); + RaftMessages.Vote.Response response = new RaftMessages.Vote.Response<>( member, 1, true ); // when channel.writeOutbound( response ); @@ -138,8 +137,8 @@ public void shouldEncodeAndDecodeAppendEntriesRequest() new AdvertisedSocketAddress( "host1:9001" ) ); RaftLogEntry logEntry = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) ); RaftMessages.AppendEntries.Request request = - new RaftMessages.AppendEntries.Request<>( member, 1, 1, 99, new RaftLogEntry[] { logEntry }, 1, - storeId ); + new RaftMessages.AppendEntries.Request<>( member, 1, 1, 99, new RaftLogEntry[] { logEntry }, 1 + ); // when channel.writeOutbound( request ); @@ -156,7 +155,7 @@ public void shouldEncodeAndDecodeAppendEntriesResponse() CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:9000" ), new AdvertisedSocketAddress( "host1:9001" ) ); RaftMessages.AppendEntries.Response response = - new RaftMessages.AppendEntries.Response<>( member, 1, false, -1, 0, storeId ); + new RaftMessages.AppendEntries.Response<>( member, 1, false, -1, 0 ); // when channel.writeOutbound( response ); @@ -173,7 +172,7 @@ public void shouldEncodeAndDecodeNewEntryRequest() CoreMember member = new CoreMember( new AdvertisedSocketAddress( "host1:9000" ), new AdvertisedSocketAddress( "host1:9001" ) ); RaftMessages.NewEntry.Request request = - new RaftMessages.NewEntry.Request<>( member, ReplicatedInteger.valueOf( 12 ), storeId ); + new RaftMessages.NewEntry.Request<>( member, ReplicatedInteger.valueOf( 12 ) ); // when channel.writeOutbound( request ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncodingDecodingTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncodingDecodingTest.java index be864f57fe481..c197d825b2354 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncodingDecodingTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageEncodingDecodingTest.java @@ -49,6 +49,9 @@ public class RaftMessageEncodingDecodingTest { + + private StoreId storeId = new StoreId( 1, 2, 3, 4, 5 ); + @Test public void shouldSerializeAppendRequestWithMultipleEntries() throws Exception { @@ -105,8 +108,8 @@ public void shouldSerializeHeartbeats() throws Exception // When CoreMember sender = new CoreMember( new AdvertisedSocketAddress( "127.0.0.1:5001" ), new AdvertisedSocketAddress( "127.0.0.2:5001" ) ); - RaftMessages.Heartbeat message = new RaftMessages.Heartbeat<>( sender, 1, 2, 3, - new StoreId( 1, 2, 3, 4, 5 ) ); + RaftMessages.StoreIdAwareMessage message = new RaftMessages.StoreIdAwareMessage<>( storeId, + new RaftMessages.Heartbeat<>( sender, 1, 2, 3 ) ); encoder.encode( setupContext(), message, resultingBuffers ); // Then @@ -125,7 +128,7 @@ public void shouldSerializeVoteRequest() throws Exception { CoreMember sender = new CoreMember( new AdvertisedSocketAddress( "127.0.0.1:5001" ), new AdvertisedSocketAddress( "127.0.0.2:5001" ) ); - RaftMessages.Vote.Request request = new VoteRequestBuilder<>() + RaftMessages.Vote.Request request = new VoteRequestBuilder() .candidate( sender ) .from( sender ) .lastLogIndex( 2 ) @@ -140,7 +143,7 @@ public void shouldSerializeVoteResponse() throws Exception { CoreMember sender = new CoreMember( new AdvertisedSocketAddress( "127.0.0.1:5001" ), new AdvertisedSocketAddress( "127.0.0.2:5001" ) ); - RaftMessages.Vote.Response request = new VoteResponseBuilder<>() + RaftMessages.Vote.Response request = new VoteResponseBuilder() .from( sender ) .grant() .term( 3 ) @@ -148,7 +151,7 @@ public void shouldSerializeVoteResponse() throws Exception serializeReadBackAndVerifyMessage( request ); } - private void serializeReadBackAndVerifyMessage( RaftMessages.RaftMessage message ) throws Exception + private void serializeReadBackAndVerifyMessage( RaftMessages.RaftMessage message ) throws Exception { // Given RaftMessageEncoder encoder = new RaftMessageEncoder( marshal ); @@ -160,7 +163,9 @@ private void serializeReadBackAndVerifyMessage( RaftMessages.RaftMessage message ArrayList thingsRead = new ArrayList<>( 1 ); // When - encoder.encode( setupContext(), message, resultingBuffers ); + RaftMessages.StoreIdAwareMessage decoratedMessage = + new RaftMessages.StoreIdAwareMessage<>( storeId, message ); + encoder.encode( setupContext(), decoratedMessage, resultingBuffers ); // Then assertEquals( 1, resultingBuffers.size() ); @@ -170,7 +175,7 @@ private void serializeReadBackAndVerifyMessage( RaftMessages.RaftMessage message // Then assertEquals( 1, thingsRead.size() ); - assertEquals( message, thingsRead.get( 0 ) ); + assertEquals( decoratedMessage, thingsRead.get( 0 ) ); } private static ChannelHandlerContext setupContext() diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java index fc64e626d9094..4d22766eb6a68 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java @@ -19,22 +19,17 @@ */ package org.neo4j.coreedge.raft.replication.shipping; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - import java.io.IOException; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; -import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.raft.LeaderContext; import org.neo4j.coreedge.raft.OutboundMessageCollector; import org.neo4j.coreedge.raft.RaftMessages; @@ -59,7 +54,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.neo4j.helpers.collection.Iterators.asSet; import static org.neo4j.test.matchers.Matchers.hasMessage; import static org.neo4j.test.matchers.Matchers.hasRaftLogEntries; @@ -116,12 +110,12 @@ public void teardown() private void startLogShipper() { LocalDatabase localDatabase = mock( LocalDatabase.class ); - when(localDatabase.storeId()).thenReturn( storeId ); + when( localDatabase.storeId() ).thenReturn( storeId ); logShipper = new RaftLogShipper<>( outbound, logProvider, raftLog, clock, leader, follower, leaderTerm, leaderCommit, - retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, new InFlightMap<>(), - localDatabase ); + retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, new InFlightMap<>() + ); logShipper.start(); } @@ -213,7 +207,7 @@ public void shouldSendNewEntriesAfterMatchingLastEntry() throws Throwable logShipper.onNewEntries( 1, 0, new RaftLogEntry[]{entry2}, new LeaderContext( 0, 0 ) ); // then - assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry1, entry2) ); + assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry1, entry2 ) ); } @Test @@ -250,7 +244,7 @@ public void shouldResendLastSentEntryOnFirstMismatch() throws Throwable logShipper.onMismatch( 1, new LeaderContext( 0, 0 ) ); // then - assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry2) ); + assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry2 ) ); } @Test @@ -313,7 +307,7 @@ public void shouldSendMostRecentlyAvailableEntryIfPruningHappened() throws IOExc //then assertTrue( outbound.hasAnyEntriesTo( follower ) ); - assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry3) ); + assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry3 ) ); } @Test @@ -371,7 +365,7 @@ public void run() //then assertTrue( outbound.hasAnyEntriesTo( follower ) ); assertThat( outbound.sentTo( follower ), - hasMessage( new RaftMessages.LogCompactionInfo<>( leader, 0, 1, storeId )) ); + hasMessage( new RaftMessages.LogCompactionInfo<>( leader, 0, 1 ) ) ); pruningThread.join(); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java index da870c3de2c9f..9457cc7d127e5 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java @@ -72,7 +72,7 @@ public void shouldPerformTruncation() throws Exception localTermForAllEntries, new RaftLogEntry[]{ new RaftLogEntry( localTermForAllEntries + 1, ReplicatedInteger.valueOf( 2 ) )}, - appendIndex + 3, storeId ), storeId ); + appendIndex + 3 ), storeId ); // then // we must produce a TruncateLogCommand at the earliest mismatching index @@ -103,7 +103,7 @@ public void shouldNotAllowTruncationAtCommit() throws Exception localTermForAllEntries, new RaftLogEntry[]{ new RaftLogEntry( localTermForAllEntries + 1, ReplicatedInteger.valueOf( 2 ) )}, - commitIndex + 3, storeId ), storeId ); + commitIndex + 3 ), storeId ); fail( "Appending should not allow truncation at or before the commit index" ); } catch ( IllegalStateException expected ) @@ -136,7 +136,7 @@ public void shouldNotAllowTruncationBeforeCommit() throws Exception localTermForAllEntries, new RaftLogEntry[]{ new RaftLogEntry( localTermForAllEntries + 1, ReplicatedInteger.valueOf( 2 ) )}, - commitIndex + 3, storeId ), storeId ); + commitIndex + 3 ), storeId ); fail( "Appending should not allow truncation at or before the commit index" ); } catch ( IllegalStateException expected ) @@ -174,7 +174,7 @@ public void shouldNotAttemptToTruncateAtIndexBeforeTheLogPrevIndex() throws Exce prevTerm, new RaftLogEntry[]{ new RaftLogEntry( prevTerm, ReplicatedInteger.valueOf( 2 ) )}, - commitIndex + 3, storeId ), storeId ); + commitIndex + 3 ), storeId ); // then // there should be no truncate commands. Actually, the whole thing should be a no op diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java index 5b7c060825693..e755c75e857ec 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java @@ -36,7 +36,6 @@ import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.RaftState; import org.neo4j.coreedge.server.RaftTestMember; -import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.logging.Log; import org.neo4j.logging.NullLogProvider; @@ -152,14 +151,14 @@ public void shouldTruncateIfTermDoesNotMatch() throws Exception new RaftLogEntry[]{ new RaftLogEntry( 2, ContentGenerator.content() ), }, - -1, storeId.storeId() ), state, log(), storeId ) ); + -1 ), state, log(), storeId ) ); RaftLogEntry[] entries = { new RaftLogEntry( 1, new ReplicatedString( "commit this!" ) ), }; Outcome outcome = follower.handle( - new AppendEntries.Request<>( member1, 1, -1, -1, entries, -1, storeId.storeId() ), state, log(), storeId ); + new AppendEntries.Request<>( member1, 1, -1, -1, entries, -1 ), state, log(), storeId ); state.update( outcome ); // then @@ -182,7 +181,7 @@ public void followerLearningAboutHigherCommitCausesValuesTobeAppliedToItsLog() t // when receiving AppEntries with high leader commit (3) Outcome outcome = follower.handle( new AppendEntries.Request<>( myself, 0, 2, 0, - new RaftLogEntry[] { new RaftLogEntry( 0, ContentGenerator.content() ) }, 3, storeId.storeId() ), state, log(), + new RaftLogEntry[] { new RaftLogEntry( 0, ContentGenerator.content() ) }, 3 ), state, log(), storeId ); state.update( outcome ); @@ -242,7 +241,7 @@ public void shouldRenewElectionTimeoutOnReceiptOfHeartbeatInCurrentOrHigherTerm( Follower follower = new Follower(); - Outcome outcome = follower.handle( new RaftMessages.Heartbeat<>( myself, 1, 1, 1, storeId.storeId() ), + Outcome outcome = follower.handle( new RaftMessages.Heartbeat<>( myself, 1, 1, 1 ), state, log(), storeId ); // then @@ -260,7 +259,7 @@ public void shouldNotRenewElectionTimeoutOnReceiptOfHeartbeatInLowerTerm() throw Follower follower = new Follower(); - Outcome outcome = follower.handle( new RaftMessages.Heartbeat<>( myself, 1, 1, 1, storeId.storeId() ), + Outcome outcome = follower.handle( new RaftMessages.Heartbeat<>( myself, 1, 1, 1 ), state, log(), storeId ); // then @@ -275,13 +274,13 @@ private void appendSomeEntriesToLog( RaftState raft, Follower fo if ( i == 0 ) { raft.update( follower.handle( new AppendEntries.Request<>( myself, term, i - 1, -1, - new RaftLogEntry[] { new RaftLogEntry( term, ContentGenerator.content() ) }, -1, - storeId.storeId() ), raft, log(), storeId ) ); + new RaftLogEntry[] { new RaftLogEntry( term, ContentGenerator.content() ) }, -1 + ), raft, log(), storeId ) ); } else { raft.update( follower.handle( new AppendEntries.Request<>( myself, term, i - 1, term, - new RaftLogEntry[]{new RaftLogEntry( term, ContentGenerator.content() )}, -1, storeId.storeId() ), raft, + new RaftLogEntry[]{new RaftLogEntry( term, ContentGenerator.content() )}, -1 ), raft, log(), storeId ) ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java index 078516a1d29f2..5e14c5f5767bd 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java @@ -385,7 +385,7 @@ public void leaderShouldDecideToAppendToItsLogAndSendAppendEntriesMessageOnRecei Leader leader = new Leader(); RaftMessages.NewEntry.Request newEntryRequest = new RaftMessages.NewEntry.Request<>( - member( 9 ), CONTENT, localDatabase.storeId() ); + member( 9 ), CONTENT ); // when Outcome outcome = leader.handle( newEntryRequest, state, log(), localDatabase ); @@ -413,7 +413,7 @@ public void leaderShouldHandleBatch() throws Exception int BATCH_SIZE = 3; RaftMessages.NewEntry.Batch batchRequest = - new RaftMessages.NewEntry.Batch<>( BATCH_SIZE, localDatabase.storeId() ); + new RaftMessages.NewEntry.Batch<>( BATCH_SIZE ); batchRequest.add( valueOf( 0 ) ); batchRequest.add( valueOf( 1 ) ); batchRequest.add( valueOf( 2 ) ); @@ -461,7 +461,7 @@ public void leaderShouldCommitOnMajorityResponse() throws Exception // when a single instance responds (plus self == 2 out of 3 instances) Outcome outcome = leader.handle( - new RaftMessages.AppendEntries.Response<>( member1, 0, true, 0, 0, localDatabase.storeId() ), + new RaftMessages.AppendEntries.Response<>( member1, 0, true, 0, 0 ), state, log(), localDatabase ); // then @@ -490,7 +490,7 @@ public void leaderShouldCommitAllPreviouslyAppendedEntriesWhenCommittingLaterEnt // when Outcome outcome = - leader.handle( new AppendEntries.Response<>( member1, 0, true, 2, 2, localDatabase.storeId() ), + leader.handle( new AppendEntries.Response<>( member1, 0, true, 2, 2 ), state, log(), localDatabase ); state.update( outcome ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/NewEntry.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/NewEntry.java index bc872822754c3..177788d5f43f0 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/NewEntry.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/NewEntry.java @@ -27,7 +27,6 @@ import org.neo4j.coreedge.raft.ReplicatedString; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.state.explorer.ClusterState; -import org.neo4j.kernel.impl.store.StoreId; public class NewEntry implements Action { @@ -44,7 +43,7 @@ public ClusterState advance( ClusterState previous ) throws IOException ClusterState newClusterState = new ClusterState( previous ); Queue> newQueue = new LinkedList<>( previous.queues.get( member ) ); newQueue.offer( new RaftMessages.NewEntry.Request( member, new ReplicatedString( - "content" ), new StoreId( 1, 2, 3, 4, 5 ) ) ); + "content" ) ) ); newClusterState.queues.put( member, newQueue ); return newClusterState; }