diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/GetStoreRequestHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/GetStoreRequestHandler.java index 1f55fb8bb676..7f37d63ac2bb 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/GetStoreRequestHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/core/GetStoreRequestHandler.java @@ -19,16 +19,16 @@ */ package org.neo4j.coreedge.catchup.storecopy.core; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.stream.ChunkedNioStream; - import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.function.Supplier; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.stream.ChunkedNioStream; + import org.neo4j.coreedge.catchup.CatchupServerProtocol; import org.neo4j.coreedge.catchup.ResponseMessageType; import org.neo4j.coreedge.catchup.storecopy.FileHeader; @@ -60,7 +60,8 @@ public GetStoreRequestHandler( CatchupServerProtocol protocol, @Override protected void channelRead0( ChannelHandlerContext ctx, GetStoreRequest msg ) throws Exception { - long lastCheckPointedTx = checkPointerSupplier.get().tryCheckPoint(new SimpleTriggerInfo("Store copy")); + System.out.println( "sending store files..." ); + long lastCheckPointedTx = checkPointerSupplier.get().tryCheckPoint( new SimpleTriggerInfo( "Store copy" ) ); sendFiles( ctx ); endStoreCopy( ctx, lastCheckPointedTx ); protocol.expect( NextMessage.MESSAGE_TYPE ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/convert/ConvertClassicStoreCommand.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/convert/ConvertClassicStoreCommand.java index 951037a63052..df738b48cb0e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/convert/ConvertClassicStoreCommand.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/convert/ConvertClassicStoreCommand.java @@ -26,14 +26,12 @@ import org.neo4j.coreedge.raft.state.DurableStateStorageImporter; import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState; import org.neo4j.graphdb.factory.EnterpriseGraphDatabaseFactory; -import org.neo4j.graphdb.factory.GraphDatabaseFactory; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; import org.neo4j.kernel.impl.store.MetaDataStore; -import org.neo4j.kernel.impl.store.StoreFactory; import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory; import org.neo4j.kernel.impl.store.id.IdGenerator; import org.neo4j.kernel.impl.store.id.IdType; @@ -77,11 +75,11 @@ public ConvertClassicStoreCommand( File databaseDir ) public void execute() throws Throwable { - appendNullTransactionLogEntry( databaseDir ); + appendNullTransactionLogEntryToSetRaftIndexToMinusOne( databaseDir ); addIdAllocationState( databaseDir ); } - private void appendNullTransactionLogEntry( File dbDir) throws TransactionFailureException + private void appendNullTransactionLogEntryToSetRaftIndexToMinusOne( File dbDir) throws TransactionFailureException { GraphDatabaseAPI db = (GraphDatabaseAPI) new EnterpriseGraphDatabaseFactory().newEmbeddedDatabase( dbDir ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java index e6061260f3cf..cad2caaeac41 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java @@ -116,34 +116,34 @@ else if ( req.leaderTerm() == ctx.term() ) case APPEND_ENTRIES_RESPONSE: { - RaftMessages.AppendEntries.Response res = (RaftMessages.AppendEntries.Response) message; + RaftMessages.AppendEntries.Response response = (RaftMessages.AppendEntries.Response) message; - if ( res.term() < ctx.term() ) + if ( response.term() < ctx.term() ) { /* Ignore responses from old terms! */ break; } - else if ( res.term() > ctx.term() ) + else if ( response.term() > ctx.term() ) { - outcome.setNextTerm( res.term() ); + outcome.setNextTerm( response.term() ); outcome.steppingDown(); outcome.setNextRole( FOLLOWER ); outcome.replaceFollowerStates( new FollowerStates<>() ); break; } - FollowerState follower = ctx.followerStates().get( res.from() ); + FollowerState follower = ctx.followerStates().get( response.from() ); - if ( res.success() ) + if ( response.success() ) { - assert res.matchIndex() <= ctx.entryLog().appendIndex(); + assert response.matchIndex() <= ctx.entryLog().appendIndex(); - boolean followerProgressed = res.matchIndex() > follower.getMatchIndex(); + boolean followerProgressed = response.matchIndex() > follower.getMatchIndex(); - outcome.replaceFollowerStates( outcome.getFollowerStates().onSuccessResponse( res.from(), - max( res.matchIndex(), follower.getMatchIndex() ) ) ); + outcome.replaceFollowerStates( outcome.getFollowerStates().onSuccessResponse( response.from(), + max( response.matchIndex(), follower.getMatchIndex() ) ) ); - outcome.addShipCommand( new ShipCommand.Match( res.matchIndex(), res.from() ) ); + outcome.addShipCommand( new ShipCommand.Match( response.matchIndex(), response.from() ) ); /* * Matches from older terms can in complicated leadership change / log truncation scenarios @@ -152,7 +152,7 @@ else if ( res.term() > ctx.term() ) * and are ready for commit. * This is explained nicely in Figure 3.7 of the thesis */ - boolean matchInCurrentTerm = ctx.entryLog().readEntryTerm( res.matchIndex() ) == ctx.term(); + boolean matchInCurrentTerm = ctx.entryLog().readEntryTerm( response.matchIndex() ) == ctx.term(); /* * The quorum situation may have changed only if the follower actually progressed. @@ -173,16 +173,17 @@ else if ( res.term() > ctx.term() ) } else // Response indicated failure. { - if( res.appendIndex() >= ctx.entryLog().prevIndex() ) + if( response.appendIndex() >= ctx.entryLog().prevIndex() ) { // Signal a mismatch to the log shipper, which will serve an earlier entry. - outcome.addShipCommand( new ShipCommand.Mismatch( res.appendIndex(), res.from() ) ); + outcome.addShipCommand( new ShipCommand.Mismatch( response.appendIndex(), response.from() ) ); } else { + System.out.println("WOULD LIKE TO BE IN HERE"); // There are no earlier entries, message the follower that we have compacted so that // it can take appropriate action. - outcome.addOutgoingMessage( new RaftMessages.Directed<>( res.from(), + outcome.addOutgoingMessage( new RaftMessages.Directed<>( response.from(), new RaftMessages.LogCompactionInfo<>( ctx.myself(), ctx.term(), ctx.entryLog().prevIndex() ) ) ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java index 3d6625d875fe..d0910cd9e445 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java @@ -185,7 +185,7 @@ public String toString() @Description( "RAFT log rotation size" ) public static final Setting raft_log_rotation_size = - setting( "core_edge.raft_log_rotation_size", BYTES, "25M", min( 1024L ) ); + setting( "core_edge.raft_log_rotation_size", BYTES, "1M", min( 1024L ) ); @Description("RAFT meta data cache size (in unit of entries)") public static final Setting raft_log_meta_data_cache_size = diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java index 774ebb4de5da..8ce659218a7a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/SenderService.java @@ -19,7 +19,6 @@ */ package org.neo4j.coreedge.server; -import org.neo4j.coreedge.network.Message; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; @@ -34,6 +33,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.raft.net.NonBlockingChannel; import org.neo4j.coreedge.raft.net.Outbound; import org.neo4j.coreedge.raft.net.monitoring.MessageQueueMonitor; @@ -49,7 +49,7 @@ public class SenderService extends LifecycleAdapter implements Outbound { private final Expiration expiration; - private final ConcurrentHashMap lazyChannelMap = + private final ConcurrentHashMap lazyChannelMap = new ConcurrentHashMap<>(); private final ExpiryScheduler scheduler; private final ChannelInitializer channelInitializer; @@ -68,7 +68,7 @@ public SenderService( ExpiryScheduler expiryScheduler, ChannelInitializer channelInitializer, LogProvider logProvider, Monitors monitors, - int maxQueueSize) + int maxQueueSize ) { this.expiration = expiration; this.scheduler = expiryScheduler; @@ -92,7 +92,7 @@ public void send( AdvertisedSocketAddress to, Message... messages ) MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, NonBlockingChannel.class ); TimestampedNonBlockingChannel lazyChannel = getAndUpdateLife( to, monitor ); NonBlockingChannel nonBlockingChannel = lazyChannel.get(); - monitor.register(to.socketAddress()); + monitor.register( to.socketAddress() ); for ( Object msg : messages ) { nonBlockingChannel.send( msg ); @@ -209,6 +209,7 @@ private synchronized void reapDeadChannels() { if ( timestampedChannel.getEndOfLife().expired() ) { + System.out.println("Reaping the channel: " + timestampedChannel); timestampedChannel.get().dispose(); itr.remove(); }