From 50c1b23402f7a1affe53cff0986a77c483a8fc84 Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Thu, 30 Jun 2016 18:34:28 +0100 Subject: [PATCH] Tidying up the Cluster class to separate creating a cluster and starting it up. --- .../org/neo4j/coreedge/raft/RaftServer.java | 61 ++- .../raft/log/segmented/SegmentedRaftLog.java | 24 +- .../raft/membership/MembershipWaiter.java | 25 +- .../neo4j/coreedge/raft/roles/Appending.java | 6 +- .../neo4j/coreedge/raft/roles/Candidate.java | 4 +- .../neo4j/coreedge/raft/roles/Follower.java | 20 +- .../org/neo4j/coreedge/raft/roles/Heart.java | 5 +- .../org/neo4j/coreedge/raft/roles/Leader.java | 7 +- .../server/core/CoreServerStartupProcess.java | 15 +- .../core/EnterpriseCoreEditionModule.java | 2 +- .../org/neo4j/coreedge/ClusterIdentityIT.java | 82 ++-- .../java/org/neo4j/coreedge/TestStoreId.java | 8 +- .../neo4j/coreedge/backup/BackupCoreIT.java | 27 +- .../org/neo4j/coreedge/discovery/Cluster.java | 434 ++++++------------ .../neo4j/coreedge/discovery/CoreServer.java | 162 +++++++ .../neo4j/coreedge/discovery/EdgeServer.java | 109 +++++ .../raft/membership/MembershipWaiterTest.java | 9 +- .../coreedge/raft/roles/AppendingTest.java | 10 +- .../scenarios/ClusterDiscoveryIT.java | 4 +- .../scenarios/ClusterFormationIT.java | 19 +- .../coreedge/scenarios/ClusterOverviewIT.java | 2 +- .../coreedge/scenarios/ClusterShutdownIT.java | 5 +- .../scenarios/ConvertNonCoreEdgeStoreIT.java | 37 +- .../coreedge/scenarios/CoreEdgeRolesIT.java | 2 +- .../coreedge/scenarios/CorePruningIT.java | 10 +- .../scenarios/CoreServerReplicationIT.java | 27 +- .../scenarios/CoreToCoreCopySnapshotIT.java | 73 ++- .../scenarios/EdgeServerReplicationIT.java | 76 +-- .../scenarios/HazelcastClientLifeCycleIT.java | 116 ----- .../neo4j/coreedge/scenarios/RecoveryIT.java | 14 +- .../neo4j/coreedge/scenarios/RestartIT.java | 20 +- .../org/neo4j/test/coreedge/ClusterRule.java | 12 +- .../org/neo4j/metrics/CoreEdgeMetricsIT.java | 24 +- 33 files changed, 779 insertions(+), 672 deletions(-) create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreServer.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeServer.java delete mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/HazelcastClientLifeCycleIT.java diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java index 98377be587ed8..367a26e55d4c6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java @@ -33,9 +33,12 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; +import org.neo4j.coreedge.raft.membership.MembershipWaiter; import org.neo4j.coreedge.raft.net.Inbound; import org.neo4j.coreedge.raft.net.codecs.RaftMessageDecoder; import org.neo4j.coreedge.raft.replication.ReplicatedContent; @@ -44,10 +47,14 @@ import org.neo4j.coreedge.server.StoreId; import org.neo4j.coreedge.server.logging.ExceptionLoggingHandler; import org.neo4j.helpers.NamedThreadFactory; +import org.neo4j.kernel.impl.store.MismatchingStoreIdException; +import org.neo4j.kernel.impl.store.StoreFailureException; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import static java.lang.String.format; + public class RaftServer extends LifecycleAdapter implements Inbound { private final ListenSocketAddress listenAddress; @@ -58,6 +65,7 @@ public class RaftServer extends LifecycleAdapter implements Inbound messageHandler; private EventLoopGroup workerGroup; private Channel channel; + private final List listeners = new ArrayList<>(); private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" ); @@ -131,31 +139,60 @@ public void registerHandler( Inbound.MessageHandler ha this.messageHandler = handler; } + public void addMismatchedStoreListener( MismatchedStoreListener listener ) + { + listeners.add( listener ); + } + private class RaftMessageHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, RaftMessages.StoreIdAwareMessage storeIdAwareMessage ) throws Exception { - RaftMessages.RaftMessage message = storeIdAwareMessage.message(); - StoreId storeId = storeIdAwareMessage.storeId(); - - if ( storeId.equals( localDatabase.storeId() ) ) + try { - messageHandler.handle( message ); - } - else - { - if ( localDatabase.isEmpty() ) + RaftMessages.RaftMessage message = storeIdAwareMessage.message(); + StoreId storeId = storeIdAwareMessage.storeId(); + + if ( storeId.equals( localDatabase.storeId() ) ) { - raftStateMachine.downloadSnapshot( message.from() ); + messageHandler.handle( message ); } else { - log.info( "Discarding message owing to mismatched storeId and non-empty store. Expected: %s, " + - "Encountered: %s", storeId, localDatabase.storeId() ); + if ( localDatabase.isEmpty() ) + { + raftStateMachine.downloadSnapshot( message.from() ); + } + else + { + log.info( "Discarding message owing to mismatched storeId and non-empty store. Expected: %s, " + + "Encountered: %s", storeId, localDatabase.storeId() ); + listeners.forEach( l -> { + MismatchedStoreIdException ex = new MismatchedStoreIdException( storeId, localDatabase.storeId() ); + l.onMismatchedStore( ex ); + } ); + } } } + catch ( Exception e ) + { + log.error( format( "Failed to process message %s", storeIdAwareMessage ), e ); + } + } + } + + public interface MismatchedStoreListener + { + void onMismatchedStore(MismatchedStoreIdException ex); + } + + public class MismatchedStoreIdException extends StoreFailureException + { + public MismatchedStoreIdException( StoreId expected, StoreId encountered ) + { + super( "Expected:" + expected + ", encountered:" + encountered ); } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java index 333e845dd0f02..0ca9cf49692c3 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java @@ -33,6 +33,7 @@ import org.neo4j.cursor.IOCursor; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import static java.lang.String.format; @@ -63,6 +64,7 @@ public class SegmentedRaftLog extends LifecycleAdapter implements RaftLog private final long rotateAtSize; private final ChannelMarshal contentMarshal; private final FileNames fileNames; + private final Log log; private boolean needsRecovery; private final LogProvider logProvider; @@ -87,6 +89,7 @@ public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long this.fileNames = new FileNames( directory ); this.readerPool = new ReaderPool( readerPoolSize, logProvider, fileNames, fileSystem, clock ); this.pruner = new SegmentedRaftLogPruner( pruningConfig, logProvider ); + this.log = logProvider.getLog( getClass() ); } @Override @@ -99,6 +102,7 @@ public synchronized void start() throws IOException, DamagedLogStorageException, RecoveryProtocol recoveryProtocol = new RecoveryProtocol( fileSystem, fileNames, readerPool, contentMarshal, logProvider, scanStats ); state = recoveryProtocol.run(); + log.info( "log started with recovered state %s", state ); } @Override @@ -216,11 +220,14 @@ public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException @Override public synchronized long skip( long newIndex, long newTerm ) throws IOException { + log.info( "Skipping from {index: %d, term: %d} to {index: %d, term: %d}", state.appendIndex, state.currentTerm, newIndex, newTerm ); if ( state.appendIndex < newIndex ) { skipSegment( state.appendIndex, newIndex, newTerm ); state.prevTerm = newTerm; + state.currentTerm = newTerm; + state.prevIndex = newIndex; state.appendIndex = newIndex; } @@ -239,6 +246,7 @@ private RaftLogEntry readLogEntry( long logIndex ) throws IOException @Override public long readEntryTerm( long logIndex ) throws IOException { + log.info( "reading entry term at %d. prevIndex:%d, prevTerm:%d", logIndex, state.prevIndex, state.prevTerm ); if ( logIndex == state.prevIndex ) { return state.prevTerm; @@ -257,8 +265,20 @@ public long prune( long safeIndex ) throws IOException { long pruneIndex = pruner.getIndexToPruneFrom( safeIndex, state.segments ); SegmentFile oldestNotDisposed = state.segments.prune( pruneIndex ); - state.prevIndex = oldestNotDisposed.header().prevIndex(); - state.prevTerm = oldestNotDisposed.header().prevTerm(); + + long newPrevIndex = oldestNotDisposed.header().prevIndex(); + long newPrevTerm = oldestNotDisposed.header().prevTerm(); + + if ( newPrevIndex > state.prevIndex ) + { + state.prevIndex = newPrevIndex; + } + + if ( newPrevTerm > state.prevTerm ) + { + state.prevTerm = newPrevTerm; + } + return state.prevIndex; } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java index c63db87af9208..0504683a94ea8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java @@ -22,8 +22,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; +import org.neo4j.coreedge.raft.RaftServer; import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.kernel.impl.store.MismatchingStoreIdException; import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -51,13 +53,15 @@ public class MembershipWaiter private final CoreMember myself; private final JobScheduler jobScheduler; private final long maxCatchupLag; + private final RaftServer raftServer; private final Log log; - public MembershipWaiter( CoreMember myself, JobScheduler jobScheduler, long maxCatchupLag, LogProvider logProvider ) + public MembershipWaiter( CoreMember myself, JobScheduler jobScheduler, long maxCatchupLag, RaftServer raftServer, LogProvider logProvider ) { this.myself = myself; this.jobScheduler = jobScheduler; this.maxCatchupLag = maxCatchupLag; + this.raftServer = raftServer; this.log = logProvider.getLog( getClass() ); } @@ -65,16 +69,19 @@ public CompletableFuture waitUntilCaughtUpMember( ReadableRaftState raf { CompletableFuture catchUpFuture = new CompletableFuture<>(); + Evaluator evaluator = new Evaluator( raftState, catchUpFuture ); + raftServer.addMismatchedStoreListener( evaluator ); + JobScheduler.JobHandle jobHandle = jobScheduler.scheduleRecurring( new JobScheduler.Group( getClass().toString(), POOLED ), - new Evaluator( raftState, catchUpFuture ), maxCatchupLag, MILLISECONDS ); + evaluator, maxCatchupLag, MILLISECONDS ); catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) ); return catchUpFuture; } - private class Evaluator implements Runnable + private class Evaluator implements Runnable, RaftServer.MismatchedStoreListener { private final ReadableRaftState raftState; private final CompletableFuture catchUpFuture; @@ -119,10 +126,8 @@ private boolean caughtUpWithLeader() lastLeaderCommit = raftState.leaderCommit(); if ( lastLeaderCommit != -1 ) { - log.info( "%s Catchup: %d => %d (%d behind)", - myself, - localCommit, lastLeaderCommit, - lastLeaderCommit - localCommit ); + long gap = lastLeaderCommit - localCommit; + log.info( "%s Catchup: %d => %d (%d behind)", myself, localCommit, lastLeaderCommit, gap ); } else { @@ -131,5 +136,11 @@ private boolean caughtUpWithLeader() return caughtUpWithLeader; } + + @Override + public void onMismatchedStore(RaftServer.MismatchedStoreIdException ex) + { + catchUpFuture.completeExceptionally( ex ); + } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java index 2cf1d5c9646ff..91c1994d4bfac 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java @@ -31,11 +31,13 @@ import org.neo4j.coreedge.raft.outcome.TruncateLogCommand; import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.state.ReadableRaftState; +import org.neo4j.logging.Log; public class Appending { static void handleAppendEntriesRequest( ReadableRaftState state, Outcome outcome, - RaftMessages.AppendEntries.Request request ) throws IOException + RaftMessages.AppendEntries.Request request, Log log ) throws IOException + { if ( request.leaderTerm() < state.term() ) { @@ -51,7 +53,7 @@ static void handleAppendEntriesRequest( ReadableRaftState state, Outcome outcome outcome.setLeader( request.from() ); outcome.setLeaderCommit( request.leaderCommit() ); - if ( !Follower.logHistoryMatches( state, request.prevLogIndex(), request.prevLogTerm() ) ) + if ( !Follower.logHistoryMatches( state, request.prevLogIndex(), request.prevLogTerm(), log ) ) { assert request.prevLogIndex() > -1 && request.prevLogTerm() > -1; RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response( diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java index 90fc1501a54d1..9c4221781e3d0 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java @@ -54,7 +54,7 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, outcome.setNextRole( FOLLOWER ); log.info( "Moving to FOLLOWER state after receiving heartbeat from %s at term %d (i am at %d)", req.from(), req.leaderTerm(), ctx.term() ); - Heart.beat( ctx, outcome, (RaftMessages.Heartbeat) message ); + Heart.beat( ctx, outcome, (RaftMessages.Heartbeat) message, log ); break; } @@ -75,7 +75,7 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, outcome.setNextRole( FOLLOWER ); log.info( "Moving to FOLLOWER state after receiving append entries request from %s at term %d (i am at %d)n", req.from(), req.leaderTerm(), ctx.term() ); - Appending.handleAppendEntriesRequest( ctx, outcome, req ); + Appending.handleAppendEntriesRequest( ctx, outcome, req, log ); break; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java index 5c6afccef1914..0b0eaff9d8e8a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java @@ -35,7 +35,7 @@ class Follower implements RaftMessageHandler { - static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, long prevLogTerm ) + static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, long prevLogTerm, Log log ) throws IOException { // NOTE: A prevLogIndex before or at our log's prevIndex means that we @@ -44,8 +44,18 @@ static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, long // NOTE: The entry term for a non existing log index is defined as -1, // so the history for a non existing log entry never matches. - return prevLogIndex <= ctx.entryLog().prevIndex() || - ctx.entryLog().readEntryTerm( prevLogIndex ) == prevLogTerm; + long logPrevIndex = ctx.entryLog().prevIndex(); + long logPrevTerm = ctx.entryLog().readEntryTerm( prevLogIndex ); + + boolean logHistoryMatches = prevLogIndex <= logPrevIndex || logPrevTerm == prevLogTerm; + + if ( !logHistoryMatches ) + { + log.info( "Log history mismatch: index:[%s, %s], term:[%s, %s]", + logPrevIndex, prevLogIndex, logPrevTerm, prevLogTerm ); + } + + return logHistoryMatches; } static void commitToLogOnUpdate( ReadableRaftState ctx, long indexOfLastNewEntry, long leaderCommit, Outcome outcome ) @@ -80,13 +90,13 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, { case HEARTBEAT: { - Heart.beat( ctx, outcome, (Heartbeat) message ); + Heart.beat( ctx, outcome, (Heartbeat) message, log ); break; } case APPEND_ENTRIES_REQUEST: { - Appending.handleAppendEntriesRequest( ctx, outcome, (AppendEntries.Request) message ); + Appending.handleAppendEntriesRequest( ctx, outcome, (AppendEntries.Request) message, log ); break; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java index 5b235a8c266cb..010d24e7c1f29 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java @@ -24,10 +24,11 @@ import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.ReadableRaftState; +import org.neo4j.logging.Log; class Heart { - static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbeat request ) throws IOException + static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbeat request, Log log ) throws IOException { if ( request.leaderTerm() < state.term() ) { @@ -39,7 +40,7 @@ static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartb outcome.setLeader( request.from() ); outcome.setLeaderCommit( request.commitIndex() ); - if ( !Follower.logHistoryMatches( state, request.commitIndex(), request.commitIndexTerm() ) ) + if ( !Follower.logHistoryMatches( state, request.commitIndex(), request.commitIndexTerm(), log ) ) { return; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java index 2551899c3e563..37656faee0371 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java @@ -80,7 +80,7 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, outcome.setNextRole( FOLLOWER ); log.info( "Moving to FOLLOWER state after receiving heartbeat at term %d (my term is " + "%d) from %s", req.leaderTerm(), ctx.term(), req.from() ); - Heart.beat( ctx, outcome, (Heartbeat) message ); + Heart.beat( ctx, outcome, (Heartbeat) message, log ); break; } @@ -114,15 +114,14 @@ else if ( req.leaderTerm() == ctx.term() ) outcome.setNextRole( FOLLOWER ); log.info( "Moving to FOLLOWER state after receiving append request at term %d (my term is " + "%d) from %s", req.leaderTerm(), ctx.term(), req.from() ); - Appending.handleAppendEntriesRequest( ctx, outcome, req ); + Appending.handleAppendEntriesRequest( ctx, outcome, req, log ); break; } } case APPEND_ENTRIES_RESPONSE: { - RaftMessages.AppendEntries.Response response = - (RaftMessages.AppendEntries.Response) message; + RaftMessages.AppendEntries.Response response = (RaftMessages.AppendEntries.Response) message; if ( response.term() < ctx.term() ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java index 028ebfbaa21d6..0ee65b295b1b9 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java @@ -20,6 +20,8 @@ package org.neo4j.coreedge.server.core; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.neo4j.coreedge.catchup.CatchupServer; import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService; @@ -52,7 +54,7 @@ public static LifeSupport createLifeSupport( DataSourceManager dataSourceManager services.add( raftServer ); services.add( catchupServer ); services.add( raftTimeoutService ); - services.add( new MembershipWaiterLifecycle( membershipWaiter, joinCatchupTimeout, raft, logProvider ) ); + services.add( new MembershipWaiterLifecycle( membershipWaiter, joinCatchupTimeout, raft, raftServer, logProvider ) ); return services; } @@ -62,14 +64,16 @@ private static class MembershipWaiterLifecycle extends LifecycleAdapter private final MembershipWaiter membershipWaiter; private final Long joinCatchupTimeout; private final RaftInstance raft; + private final RaftServer raftServer; private final Log log; private MembershipWaiterLifecycle( MembershipWaiter membershipWaiter, Long joinCatchupTimeout, - RaftInstance raft, LogProvider logProvider ) + RaftInstance raft, RaftServer raftServer, LogProvider logProvider ) { this.membershipWaiter = membershipWaiter; this.joinCatchupTimeout = joinCatchupTimeout; this.raft = raft; + this.raftServer = raftServer; this.log = logProvider.getLog( getClass() ); } @@ -82,7 +86,12 @@ public void start() throws Throwable { caughtUp.get( joinCatchupTimeout, MILLISECONDS ); } - catch ( Throwable e ) + catch(ExecutionException e) + { + log.error( "Server failed to join cluster", e.getCause() ); + throw e.getCause() ; + } + catch ( InterruptedException | TimeoutException e ) { String message = format( "Server failed to join cluster within catchup time limit [%d ms]", joinCatchupTimeout ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index f2bcbc7fbcf7a..272ef8296ebd5 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -358,7 +358,7 @@ public void registerProcedures( Procedures procedures ) long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); MembershipWaiter membershipWaiter = - new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, logProvider ); + new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, raftServer, logProvider ); ReplicatedIdGeneratorFactory replicatedIdGeneratorFactory = createIdGeneratorFactory( fileSystem, idRangeAcquirer, logProvider ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java index 0e34ae084c1f9..2014f75165c0c 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java @@ -32,19 +32,24 @@ import java.util.stream.Collectors; import org.neo4j.coreedge.discovery.Cluster; -import org.neo4j.coreedge.raft.state.CoreState; +import org.neo4j.coreedge.discovery.CoreServer; +import org.neo4j.coreedge.raft.RaftServer; import org.neo4j.coreedge.server.CoreEdgeClusterSettings; -import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.graphdb.Node; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; -import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.impl.pagecache.StandalonePageCacheFactory; import org.neo4j.kernel.impl.store.MetaDataStore; +import org.neo4j.kernel.impl.store.MismatchingStoreIdException; +import org.neo4j.kernel.lifecycle.LifecycleException; import org.neo4j.test.coreedge.ClusterRule; import org.neo4j.test.rule.SuppressOutput; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.neo4j.coreedge.TestStoreId.assertAllStoresHaveTheSameStoreId; import static org.neo4j.graphdb.Label.label; @@ -82,7 +87,7 @@ public void allServersShouldHaveTheSameStoreId() throws Throwable tx.success(); } ); - List coreStoreDirs = storeDirs( cluster.coreServers() ); + List coreStoreDirs = storeDirs( cluster.coreServers() ); cluster.shutdown(); @@ -105,7 +110,7 @@ public void whenWeRestartTheClusterAllServersShouldStillHaveTheSameStoreId() thr // WHEN cluster.start(); - List coreStoreDirs = storeDirs( cluster.coreServers() ); + List coreStoreDirs = storeDirs( cluster.coreServers() ); cluster.coreTx( ( db, tx ) -> { Node node = db.createNode( label( "boo" ) ); @@ -129,20 +134,21 @@ public void shouldNotJoinClusterIfHasDataWithDifferentStoreId() throws Exception tx.success(); } ); - String storeDir = cluster.getCoreServerById( 0 ).getStoreDir(); + File storeDir = cluster.getCoreServerById( 0 ).storeDir(); cluster.removeCoreServerWithServerId( 0 ); changeStoreId( storeDir ); // WHEN - Future future = cluster.asyncAddCoreServerWithServerId( 0, 3 ); - - cluster.awaitLeader(); - - // THEN - assertEquals( 2, cluster.healthyCoreMembers().size() ); - - future.cancel( true ); + try + { + cluster.addCoreServerWithServerId( 0, 3 ).start(); + fail( "Should not have joined the cluster" ); + } + catch ( RuntimeException e ) + { + assertThat(e.getCause(), instanceOf(LifecycleException.class)); + } } @Test @@ -159,20 +165,20 @@ public void laggingFollowerShouldDownloadSnapshot() throws Exception createSomeData( 100, cluster ); - for ( CoreGraphDatabase db : cluster.coreServers() ) + for ( CoreServer db : cluster.coreServers() ) { - getCoreState( db ).compact(); + db.coreState().compact(); } // WHEN - cluster.addCoreServerWithServerId( 0, 3 ); + cluster.addCoreServerWithServerId( 0, 3 ).start(); cluster.awaitLeader(); // THEN assertEquals( 3, cluster.healthyCoreMembers().size() ); - List coreStoreDirs = storeDirs( cluster.coreServers() ); + List coreStoreDirs = storeDirs( cluster.coreServers() ); cluster.shutdown(); assertAllStoresHaveTheSameStoreId( coreStoreDirs, fs ); } @@ -187,25 +193,27 @@ public void badFollowerShouldNotJoinCluster() throws Exception tx.success(); } ); - String storeDir = cluster.getCoreServerById( 0 ).getStoreDir(); + File storeDir = cluster.getCoreServerById( 0 ).storeDir(); cluster.removeCoreServerWithServerId( 0 ); changeStoreId( storeDir ); createSomeData( 100, cluster ); - for ( CoreGraphDatabase db : cluster.coreServers() ) + for ( CoreServer db : cluster.coreServers() ) { - getCoreState( db ).compact(); + db.coreState().compact(); } // WHEN - Future future = cluster.asyncAddCoreServerWithServerId( 0, 3 ); - cluster.awaitLeader(); - - // THEN - assertEquals( 2, cluster.healthyCoreMembers().size() ); - - future.cancel( true ); + try + { + cluster.addCoreServerWithServerId( 0, 3 ).start(); + fail( "Should not have joined the cluster" ); + } + catch ( RuntimeException e ) + { + assertThat(e.getCause(), instanceOf(LifecycleException.class)); + } } @Test @@ -220,27 +228,27 @@ public void aNewServerShouldJoinTheClusterByDownloadingASnapshot() throws Except createSomeData( 100, cluster ); - for ( CoreGraphDatabase db : cluster.coreServers() ) + for ( CoreServer db : cluster.coreServers() ) { - getCoreState( db ).compact(); + db.coreState().compact(); } // WHEN - cluster.addCoreServerWithServerId( 4, 4 ); + cluster.addCoreServerWithServerId( 4, 4 ).start(); cluster.awaitLeader(); // THEN assertEquals( 4, cluster.healthyCoreMembers().size() ); - List coreStoreDirs = storeDirs( cluster.coreServers() ); + List coreStoreDirs = storeDirs( cluster.coreServers() ); cluster.shutdown(); assertAllStoresHaveTheSameStoreId( coreStoreDirs, fs ); } - private List storeDirs( Collection dbs ) + private List storeDirs( Collection dbs ) { - return dbs.stream().map( GraphDatabaseFacade::getStoreDir ).collect( Collectors.toList() ); + return dbs.stream().map( CoreServer::storeDir ).collect( Collectors.toList() ); } private void createSomeData( int items, Cluster cluster ) throws TimeoutException, InterruptedException @@ -255,7 +263,7 @@ private void createSomeData( int items, Cluster cluster ) throws TimeoutExceptio } } - private void changeStoreId( String storeDir ) throws IOException + private void changeStoreId( File storeDir ) throws IOException { File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME ); try ( PageCache pageCache = StandalonePageCacheFactory.createPageCache( fs ) ) @@ -264,8 +272,4 @@ private void changeStoreId( String storeDir ) throws IOException } } - private CoreState getCoreState( CoreGraphDatabase db ) - { - return db.getDependencyResolver().resolveDependency( CoreState.class ); - } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/TestStoreId.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/TestStoreId.java index f1b7fbdc1ef5d..b31de87632659 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/TestStoreId.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/TestStoreId.java @@ -89,13 +89,13 @@ public String toString() '}'; } - public static void assertAllStoresHaveTheSameStoreId( List coreStoreDirs, FileSystemAbstraction fs ) + public static void assertAllStoresHaveTheSameStoreId( List coreStoreDirs, FileSystemAbstraction fs ) throws IOException { Set storeIds = new HashSet<>(); try ( PageCache pageCache = StandalonePageCacheFactory.createPageCache( fs ) ) { - for ( String coreStoreDir : coreStoreDirs ) + for ( File coreStoreDir : coreStoreDirs ) { storeIds.add( doReadStoreId( coreStoreDir, pageCache ) ); } @@ -103,7 +103,7 @@ public static void assertAllStoresHaveTheSameStoreId( List coreStoreDirs assertEquals( "Store Ids " + storeIds, 1, storeIds.size() ); } - public static TestStoreId readStoreId( String coreStoreDir, DefaultFileSystemAbstraction fs ) throws IOException + public static TestStoreId readStoreId( File coreStoreDir, DefaultFileSystemAbstraction fs ) throws IOException { try ( PageCache pageCache = StandalonePageCacheFactory.createPageCache( fs ) ) { @@ -111,7 +111,7 @@ public static TestStoreId readStoreId( String coreStoreDir, DefaultFileSystemAbs } } - private static TestStoreId doReadStoreId( String coreStoreDir, PageCache pageCache ) throws IOException + private static TestStoreId doReadStoreId( File coreStoreDir, PageCache pageCache ) throws IOException { File metadataStore = new File( coreStoreDir, MetaDataStore.DEFAULT_NAME ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/BackupCoreIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/BackupCoreIT.java index f6207d1bac5cf..b44101b8837af 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/BackupCoreIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/BackupCoreIT.java @@ -27,7 +27,6 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeoutException; import java.util.stream.Stream; @@ -35,6 +34,7 @@ import org.neo4j.backup.OnlineBackupSettings; import org.neo4j.coreedge.TestStoreId; import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.coreedge.discovery.CoreServer; import org.neo4j.coreedge.server.CoreEdgeClusterSettings; import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.graphdb.Node; @@ -43,7 +43,6 @@ import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Settings; -import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.restore.RestoreClusterCliTest; import org.neo4j.restore.RestoreClusterUtils; @@ -106,12 +105,12 @@ public void makeSureBackupCanBePerformed() throws Throwable @Test public void makeSureBackupCanBePerformedFromAnyInstance() throws Throwable { - for ( CoreGraphDatabase db : cluster.coreServers() ) + for ( CoreServer db : cluster.coreServers() ) { // Run backup DbRepresentation beforeChange = DbRepresentation.of(createSomeData( cluster )); File backupPathPerCoreMachine = new File( backupPath, "" + db.id().hashCode() ); - String[] args = backupArguments(backupAddress(db), backupPathPerCoreMachine.getPath() ); + String[] args = backupArguments(backupAddress(db.database()), backupPathPerCoreMachine.getPath() ); assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) ); // Add some new data @@ -135,7 +134,7 @@ public void makeSureCoreClusterCanBeRestoredFromABackup() throws Throwable // when we shutdown the cluster we lose the number of core servers so we won't go through the for loop unless // we capture the count beforehand - List dbPaths = cluster.coreServers().stream().map( GraphDatabaseFacade::getStoreDir ).collect( toList() ); + List dbPaths = cluster.coreServers().stream().map( CoreServer::storeDir ).collect( toList() ); int numberOfCoreServers = dbPaths.size(); cluster.shutdown(); @@ -143,15 +142,17 @@ public void makeSureCoreClusterCanBeRestoredFromABackup() throws Throwable TestStoreId storeId = TestStoreId.readStoreId( dbPaths.get( 0 ), fs ); // when - StringBuilder output = RestoreClusterUtils.execute( () -> RestoreNewClusterCli.main( toArray( args() - .homeDir( cluster.homeDir( 0 ) ).config( cluster.homeDir( 0 ) ).from( backupPath ) - .database( "graph.db" ).force().build() ) ) ); + StringBuilder output = RestoreClusterUtils.execute( () -> { + File homeDir = cluster.getCoreServerById( 0 ).homeDir(); + RestoreNewClusterCli.main( toArray( args().homeDir( homeDir ).config( homeDir ).from( backupPath ) + .database( "graph.db" ).force().build() ) ); + } ); String seed = RestoreClusterCliTest.extractSeed( output ); for ( int i = 1; i < numberOfCoreServers; i++ ) { - File homeDir = cluster.homeDir( i ); + File homeDir = cluster.getCoreServerById( i ).homeDir(); RestoreClusterUtils.execute( () -> RestoreExistingClusterCli.main( toArray( args().homeDir( homeDir ) .config( homeDir ).from( backupPath ).database( "graph.db" ).seed( seed ).force().build() ) ) ); } @@ -159,11 +160,11 @@ public void makeSureCoreClusterCanBeRestoredFromABackup() throws Throwable cluster.start(); // then - Collection coreGraphDatabases = cluster.coreServers(); - Stream dbRepresentations = coreGraphDatabases.stream().map( DbRepresentation::of ); + Collection coreGraphDatabases = cluster.coreServers(); + Stream dbRepresentations = coreGraphDatabases.stream().map( x -> DbRepresentation.of(x.database()) ); dbRepresentations.forEach( afterReSeed -> assertEquals( beforeBackup, afterReSeed ) ); - List afterRestoreDbPaths = coreGraphDatabases.stream().map( GraphDatabaseFacade::getStoreDir ).collect( toList() ); + List afterRestoreDbPaths = coreGraphDatabases.stream().map( CoreServer::storeDir ).collect( toList() ); cluster.shutdown(); assertAllStoresHaveTheSameStoreId( afterRestoreDbPaths, fs ); @@ -177,7 +178,7 @@ static CoreGraphDatabase createSomeData( Cluster cluster ) throws TimeoutExcepti Node node = db.createNode( label( "boo" ) ); node.setProperty( "foobar", "baz_bat" ); tx.success(); - } ); + } ).database(); } static String backupAddress(CoreGraphDatabase db) { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java index c4259086f4e06..a9ab3235c75cc 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java @@ -32,7 +32,6 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.LockSupport; import java.util.function.BiConsumer; @@ -49,18 +48,14 @@ import org.neo4j.coreedge.server.edge.EdgeGraphDatabase; import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.TransactionFailureException; -import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.kernel.GraphDatabaseDependencies; import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.kernel.internal.DatabaseHealth; -import org.neo4j.logging.Level; import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException; import static java.util.Collections.emptyMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.stream.Collectors.joining; + import static org.neo4j.concurrent.Futures.combine; import static org.neo4j.helpers.collection.Iterables.firstOrNull; import static org.neo4j.helpers.collection.MapUtil.stringMap; @@ -68,48 +63,35 @@ public class Cluster { - private static final String CLUSTER_NAME = "core-neo4j"; private static final int DEFAULT_TIMEOUT_MS = 15_000; private static final int DEFAULT_BACKOFF_MS = 100; private final File parentDir; - private final Map coreParams; - private Map> instanceCoreParams; - private final Map edgeParams; - private final Map> instanceEdgeParams; - private final String recordFormat; private final DiscoveryServiceFactory discoveryServiceFactory; - private final int noOfCoreServers; - private final int noOfEdgeServers; - private Map coreServers = new ConcurrentHashMap<>(); - private Map edgeServers = new ConcurrentHashMap<>(); + private Map coreServers = new ConcurrentHashMap<>(); + private Map edgeServers = new ConcurrentHashMap<>(); - public Cluster( File parentDir, int noOfCoreServers, int noOfEdgeServers, DiscoveryServiceFactory fatory, + public Cluster( File parentDir, int noOfCoreServers, int noOfEdgeServers, + DiscoveryServiceFactory discoveryServiceFactory, Map coreParams, Map> instanceCoreParams, Map edgeParams, Map> instanceEdgeParams, String recordFormat ) - throws ExecutionException, InterruptedException { - this.noOfCoreServers = noOfCoreServers; - this.noOfEdgeServers = noOfEdgeServers; - this.discoveryServiceFactory = fatory; + this.discoveryServiceFactory = discoveryServiceFactory; this.parentDir = parentDir; - this.coreParams = coreParams; - this.instanceCoreParams = instanceCoreParams; - this.edgeParams = edgeParams; - this.instanceEdgeParams = instanceEdgeParams; - this.recordFormat = recordFormat; + List initialHosts = buildAddresses( noOfCoreServers ); + createCoreServers( noOfCoreServers, initialHosts, coreParams, instanceCoreParams, recordFormat ); + createEdgeServers( noOfEdgeServers, initialHosts, edgeParams, instanceEdgeParams, recordFormat ); } public void start() throws InterruptedException, ExecutionException { - List initialHosts = buildAddresses( noOfCoreServers ); ExecutorService executor = Executors.newCachedThreadPool(); try { - startCoreServers( executor, noOfCoreServers, initialHosts, coreParams, instanceCoreParams, recordFormat ); - startEdgeServers( executor, noOfEdgeServers, initialHosts, edgeParams, instanceEdgeParams, recordFormat ); + startCoreServers( executor ); + startEdgeServers( executor ); } finally { @@ -117,174 +99,43 @@ public void start() throws InterruptedException, ExecutionException } } - public File coreServerStoreDirectory( int serverId ) - { - return coreServerStoreDirectory( homeDir( serverId ) ); - } - - public File homeDir( int serverId ) - { - return new File( parentDir, "server-core-" + serverId ); - } - - public static File coreServerStoreDirectory( File neo4jHome ) - { - return new File( new File( new File( neo4jHome, "data" ), "databases" ), "graph.db" ); - } - - public static File edgeServerStoreDirectory( File parentDir, int serverId ) + public Set healthyCoreMembers() { - return new File( parentDir, "server-edge-" + serverId ); - } - - private Map serverParams( String serverType, int serverId, String initialHosts ) - { - Map params = stringMap(); - params.put( "dbms.mode", serverType ); - params.put( GraphDatabaseSettings.store_internal_log_level.name(), Level.DEBUG.name() ); - params.put( CoreEdgeClusterSettings.cluster_name.name(), CLUSTER_NAME ); - params.put( CoreEdgeClusterSettings.initial_core_cluster_members.name(), initialHosts ); - return params; + return coreServers.values().stream() + .filter( db -> db.database().getDependencyResolver().resolveDependency( DatabaseHealth.class ).isHealthy() ) + .collect( Collectors.toSet() ); } - private static List buildAddresses( int noOfCoreServers ) + public CoreServer getCoreServerById( int serverId ) { - List addresses = new ArrayList<>(); - for ( int i = 0; i < noOfCoreServers; i++ ) - { - int port = 5000 + i; - addresses.add( new AdvertisedSocketAddress( "localhost:" + port ) ); - } - return addresses; + return coreServers.get( serverId ); } - private void startCoreServers( ExecutorService executor, final int noOfCoreServers, - List addresses, Map extraParams, - Map> instanceExtraParams, String recordFormat ) - throws InterruptedException, ExecutionException + public EdgeGraphDatabase getEdgeServerById( int serverId ) { - CompletionService ecs = new ExecutorCompletionService<>( executor ); - - for ( int i = 0; i < noOfCoreServers; i++ ) - { - final int serverId = i; - ecs.submit( () -> { - CoreGraphDatabase coreServer = startCoreServer( serverId, noOfCoreServers, addresses, extraParams, - instanceExtraParams, recordFormat ); - return coreServers.put( serverId, coreServer ); - } ); - } - - for ( int i = 0; i < noOfCoreServers; i++ ) - { - ecs.take().get(); - } + return edgeServers.get( serverId ).database(); } - private void startEdgeServers( ExecutorService executor, int noOfEdgeServers, - final List addresses, - Map extraParams, - Map> instanceExtraParams, - String recordFormat ) - throws InterruptedException, ExecutionException + public CoreServer addCoreServerWithServerId( int serverId, int intendedClusterSize ) { - CompletionService ecs = new ExecutorCompletionService<>( executor ); - - for ( int i = 0; i < noOfEdgeServers; i++ ) - { - final int serverId = i; - ecs.submit( () -> startEdgeServer( serverId, addresses, extraParams, instanceExtraParams, recordFormat ) ); - } - - for ( int i = 0; i < noOfEdgeServers; i++ ) - { - this.edgeServers.put( i, ecs.take().get() ); - } + return addCoreServerWithServerId( serverId, intendedClusterSize, stringMap(), emptyMap(), StandardV3_0.NAME ); } - private CoreGraphDatabase startCoreServer( int serverId, int clusterSize, List addresses, - Map extraParams, - Map> instanceExtraParams, - String recordFormat ) + public EdgeServer addEdgeServerWithIdAndRecordFormat( int serverId, String recordFormat ) { - int clusterPort = 5000 + serverId; - int txPort = 6000 + serverId; - int raftPort = 7000 + serverId; - int boltPort = 8000 + serverId; - - String initialHosts = addresses.stream().map( AdvertisedSocketAddress::toString ).collect( joining( "," ) ); - - final Map params = serverParams( "CORE", serverId, initialHosts ); - - params.put( GraphDatabaseSettings.record_format.name(), recordFormat ); - - params.put( CoreEdgeClusterSettings.cluster_listen_address.name(), "localhost:" + clusterPort ); - - params.put( CoreEdgeClusterSettings.transaction_advertised_address.name(), "localhost:" + txPort ); - params.put( CoreEdgeClusterSettings.transaction_listen_address.name(), "127.0.0.1:" + txPort ); - params.put( CoreEdgeClusterSettings.raft_advertised_address.name(), "localhost:" + raftPort ); - params.put( CoreEdgeClusterSettings.raft_listen_address.name(), "127.0.0.1:" + raftPort ); - - params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); - params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); - params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).address.name(), "0.0.0.0:" + boltPort ); - - params.put( GraphDatabaseSettings.bolt_advertised_address.name(), "127.0.0.1:" + boltPort ); - - params.put( CoreEdgeClusterSettings.expected_core_cluster_size.name(), String.valueOf( clusterSize ) ); - params.put( GraphDatabaseSettings.pagecache_memory.name(), "8m" ); - params.put( GraphDatabaseSettings.auth_store.name(), new File( parentDir, "auth" ).getAbsolutePath() ); - - params.putAll( extraParams ); - - for ( Map.Entry> entry : instanceExtraParams.entrySet() ) - { - params.put( entry.getKey(), entry.getValue().apply( serverId ) ); - } + Config config = firstOrNull( coreServers.values() ).database().getDependencyResolver().resolveDependency( Config.class ); + List advertisedAddresses = + config.get( CoreEdgeClusterSettings.initial_core_cluster_members ); - File neo4jHome = new File( parentDir, "server-core-" + serverId ); - final File storeDir = coreServerStoreDirectory( neo4jHome ); - params.put( GraphDatabaseSettings.logs_directory.name(), new File(neo4jHome, "logs").getAbsolutePath() ); - return new CoreGraphDatabase( storeDir, params, GraphDatabaseDependencies.newDependencies(), - discoveryServiceFactory ); + EdgeServer server = new EdgeServer( parentDir, serverId, discoveryServiceFactory, advertisedAddresses, + stringMap(), emptyMap(), recordFormat ); + edgeServers.put( serverId, server ); + return server; } - private EdgeGraphDatabase startEdgeServer( int serverId, List addresses, - Map extraParams, - Map> instanceExtraParams, - String recordFormat ) + public EdgeServer addEdgeServerWithId( int serverId ) { - final File storeDir = edgeServerStoreDirectory( parentDir, serverId ); - return startEdgeServer( serverId, storeDir, addresses, extraParams, instanceExtraParams, recordFormat ); - } - - private EdgeGraphDatabase startEdgeServer( int serverId, File storeDir, List addresses, - Map extraParams, - Map> instanceExtraParams, - String recordFormat ) - { - String initialHosts = addresses.stream().map( AdvertisedSocketAddress::toString ).collect( joining( "," ) ); - - final Map params = serverParams( "EDGE", serverId, initialHosts ); - params.put( GraphDatabaseSettings.record_format.name(), recordFormat ); - params.put( GraphDatabaseSettings.pagecache_memory.name(), "8m" ); - params.put( GraphDatabaseSettings.auth_store.name(), new File( parentDir, "auth" ).getAbsolutePath() ); - params.put( GraphDatabaseSettings.logs_directory.name(), storeDir.getAbsolutePath() ); - - params.putAll( extraParams ); - - for ( Map.Entry> entry : instanceExtraParams.entrySet() ) - { - params.put( entry.getKey(), entry.getValue().apply( serverId ) ); - } - params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); - params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); - params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).address.name(), "0.0.0.0:" + (9000 + serverId) ); - - params.put( GraphDatabaseSettings.bolt_advertised_address.name(), "127.0.0.1:" + (9000 + serverId) ); - - return new EdgeGraphDatabase( storeDir, params, GraphDatabaseDependencies.newDependencies(), - discoveryServiceFactory ); + return addEdgeServerWithIdAndRecordFormat( serverId, StandardV3_0.NAME ); } public void shutdown() throws ExecutionException, InterruptedException @@ -297,7 +148,7 @@ public void shutdownCoreServers() throws InterruptedException, ExecutionExceptio { ExecutorService executor = Executors.newCachedThreadPool(); List> serverShutdownSuppliers = new ArrayList<>(); - for ( final CoreGraphDatabase coreServer : coreServers.values() ) + for ( final CoreServer coreServer : coreServers.values() ) { serverShutdownSuppliers.add( () -> { coreServer.shutdown(); @@ -312,35 +163,16 @@ public void shutdownCoreServers() throws InterruptedException, ExecutionExceptio finally { executor.shutdown(); - coreServers.clear(); } } - private void shutdownEdgeServers() - { - for ( EdgeGraphDatabase edgeServer : edgeServers.values() ) - { - edgeServer.shutdown(); - } - edgeServers.clear(); - } - - public CoreGraphDatabase getCoreServerById( int serverId ) - { - return coreServers.get( serverId ); - } - - public EdgeGraphDatabase getEdgeServerById( int serverId ) - { - return edgeServers.get( serverId ); - } - public void removeCoreServerWithServerId( int serverId ) { - CoreGraphDatabase serverToRemove = getCoreServerById( serverId ); + CoreServer serverToRemove = getCoreServerById( serverId ); if ( serverToRemove != null ) { + serverToRemove.shutdown(); removeCoreServer( serverToRemove ); } else @@ -349,87 +181,32 @@ public void removeCoreServerWithServerId( int serverId ) } } - public void removeCoreServer( CoreGraphDatabase serverToRemove ) + public void removeCoreServer( CoreServer serverToRemove ) { serverToRemove.shutdown(); coreServers.values().remove( serverToRemove ); } - public void addCoreServerWithServerId( int serverId, int intendedClusterSize ) - { - addCoreServerWithServerId( serverId, intendedClusterSize, stringMap(), emptyMap(), StandardV3_0.NAME ); - } - - public Future asyncAddCoreServerWithServerId( int serverId, int intendedClusterSize ) - { - ExecutorService executor = Executors.newCachedThreadPool(); - return executor.submit( () -> { - addCoreServerWithServerId( serverId, intendedClusterSize, stringMap(), emptyMap(), - StandardV3_0.NAME ); - executor.shutdown(); - } - ); - } - - private void addCoreServerWithServerId( int serverId, int intendedClusterSize, Map extraParams, - Map> instanceExtraParams, String recordFormat ) - { - Config config = firstOrNull( coreServers.values() ) - .getDependencyResolver().resolveDependency( Config.class ); - List advertisedAddress = - config.get( CoreEdgeClusterSettings.initial_core_cluster_members ); - - coreServers.put( serverId, startCoreServer( serverId, intendedClusterSize, advertisedAddress, extraParams, - instanceExtraParams, recordFormat ) ); - } - - public void addEdgeServerWithFileLocation( int serverId, String recordFormat ) - { - Config config = firstOrNull( coreServers.values() ).getDependencyResolver().resolveDependency( Config.class ); - List advertisedAddresses = - config.get( CoreEdgeClusterSettings.initial_core_cluster_members ); - - edgeServers.put( serverId, - startEdgeServer( serverId, advertisedAddresses, stringMap(), emptyMap(), recordFormat ) ); - } - - public void addEdgeServerWithId( int serverId ) - { - addEdgeServerWithFileLocation( serverId, StandardV3_0.NAME ); - } - - public int serverIdFor( GraphDatabaseFacade graphDatabaseFacade ) - { - for ( Map.Entry entry : coreServers.entrySet() ) - { - if (entry.getValue() == graphDatabaseFacade) - { - return entry.getKey(); - } - } - throw new IllegalArgumentException( "No such database found." ); - } - - public Collection coreServers() + public Collection coreServers() { return coreServers.values(); } - public Collection edgeServers() + public Collection edgeServers() { return edgeServers.values(); } - public EdgeGraphDatabase findAnEdgeServer() + public EdgeServer findAnEdgeServer() { return firstOrNull( edgeServers.values() ); } - public CoreGraphDatabase getDbWithRole( Role role ) + public CoreServer getDbWithRole( Role role ) { - for ( CoreGraphDatabase coreServer : coreServers.values() ) + for ( CoreServer coreServer : coreServers.values() ) { - if ( coreServer.getRole().equals( role ) ) + if ( coreServer.database() != null && coreServer.database().getRole().equals( role ) ) { return coreServer; } @@ -437,21 +214,21 @@ public CoreGraphDatabase getDbWithRole( Role role ) return null; } - public CoreGraphDatabase awaitLeader() throws TimeoutException + public CoreServer awaitLeader() throws TimeoutException { return awaitCoreGraphDatabaseWithRole( DEFAULT_TIMEOUT_MS, Role.LEADER ); } - public CoreGraphDatabase awaitLeader( long timeoutMillis ) throws TimeoutException + public CoreServer awaitLeader( long timeoutMillis ) throws TimeoutException { return awaitCoreGraphDatabaseWithRole( timeoutMillis, Role.LEADER ); } - public CoreGraphDatabase awaitCoreGraphDatabaseWithRole( long timeoutMillis, Role role ) throws TimeoutException + public CoreServer awaitCoreGraphDatabaseWithRole( long timeoutMillis, Role role ) throws TimeoutException { long endTimeMillis = timeoutMillis + System.currentTimeMillis(); - CoreGraphDatabase db; + CoreServer db; while ( (db = getDbWithRole( role )) == null && (System.currentTimeMillis() < endTimeMillis) ) { LockSupport.parkNanos( MILLISECONDS.toNanos( 100 ) ); @@ -466,52 +243,55 @@ public CoreGraphDatabase awaitCoreGraphDatabaseWithRole( long timeoutMillis, Rol public int numberOfCoreServers() { - CoreGraphDatabase aCoreGraphDb = coreServers.values().iterator().next(); - CoreTopologyService coreTopologyService = aCoreGraphDb.getDependencyResolver() + CoreServer aCoreGraphDb = coreServers.values().stream() + .filter( ( server ) -> server.database() != null ).findAny().get(); + CoreTopologyService coreTopologyService = aCoreGraphDb.database().getDependencyResolver() .resolveDependency( CoreTopologyService.class ); return coreTopologyService.currentTopology().coreMembers().size(); } - public void addEdgeServerWithFileLocation( File edgeDatabaseStoreFileLocation ) - { - Config config = - coreServers.values().iterator().next().getDependencyResolver().resolveDependency( Config.class ); - List advertisedAddresses = - config.get( CoreEdgeClusterSettings.initial_core_cluster_members ); - - edgeServers.put( 999, - startEdgeServer( 999, edgeDatabaseStoreFileLocation, advertisedAddresses, stringMap(), emptyMap(), - StandardV3_0.NAME ) ); - } - /** * Perform a transaction against the core cluster, selecting the target and retrying as necessary. */ - public CoreGraphDatabase coreTx( BiConsumer op ) + public CoreServer coreTx( BiConsumer op ) throws TimeoutException, InterruptedException { // this currently wraps the leader-only strategy, since it is the recommended and only approach return leaderTx( op ); } + private CoreServer addCoreServerWithServerId( int serverId, int intendedClusterSize, Map extraParams, + Map> instanceExtraParams, String recordFormat ) + { + Config config = firstOrNull( coreServers.values() ).database().getDependencyResolver().resolveDependency( Config.class ); + List advertisedAddress = config.get( CoreEdgeClusterSettings.initial_core_cluster_members ); + + CoreServer coreServer = new CoreServer( serverId, intendedClusterSize, advertisedAddress, + discoveryServiceFactory, recordFormat, parentDir, + extraParams, instanceExtraParams ); + coreServers.put( serverId, coreServer ); + return coreServer; + } + /** * Perform a transaction against the leader of the core cluster, retrying as necessary. */ - private CoreGraphDatabase leaderTx( BiConsumer op ) + private CoreServer leaderTx( BiConsumer op ) throws TimeoutException, InterruptedException { long endTime = System.currentTimeMillis() + DEFAULT_TIMEOUT_MS; do { - CoreGraphDatabase db = awaitCoreGraphDatabaseWithRole( DEFAULT_TIMEOUT_MS, Role.LEADER ); + CoreServer server = awaitCoreGraphDatabaseWithRole( DEFAULT_TIMEOUT_MS, Role.LEADER ); + CoreGraphDatabase db = server.database(); try { Transaction tx = db.beginTx(); op.accept( db, tx ); tx.close(); - return db; + return server; } catch ( Throwable e ) { @@ -554,10 +334,84 @@ private boolean isLockExpired( Throwable e ) LockSessionExpired; } - public Set healthyCoreMembers() + private static List buildAddresses( int noOfCoreServers ) { - return coreServers.values().stream() - .filter( db -> db.getDependencyResolver().resolveDependency( DatabaseHealth.class ).isHealthy() ) - .collect( Collectors.toSet() ); + List addresses = new ArrayList<>(); + for ( int i = 0; i < noOfCoreServers; i++ ) + { + int port = 5000 + i; + addresses.add( new AdvertisedSocketAddress( "localhost:" + port ) ); + } + return addresses; } + + private void createCoreServers( final int noOfCoreServers, + List addresses, Map extraParams, + Map> instanceExtraParams, String recordFormat ) + { + + for ( int i = 0; i < noOfCoreServers; i++ ) + { + CoreServer coreServer = new CoreServer( i, noOfCoreServers, addresses, discoveryServiceFactory, + recordFormat, parentDir, + extraParams, instanceExtraParams ); + coreServers.put( i, coreServer ); + } + } + + private void startCoreServers( ExecutorService executor ) throws InterruptedException, ExecutionException + { + CompletionService ecs = new ExecutorCompletionService<>( executor ); + + for ( CoreServer coreServer : coreServers.values() ) + { + ecs.submit( () -> { + coreServer.start(); + return coreServer.database(); + } ); + } + + for ( int i = 0; i < coreServers.size(); i++ ) + { + ecs.take().get(); + } + } + + private void startEdgeServers( ExecutorService executor ) throws InterruptedException, ExecutionException + { + CompletionService ecs = new ExecutorCompletionService<>( executor ); + + for ( EdgeServer edgeServer : edgeServers.values() ) + { + ecs.submit( () -> { + edgeServer.start(); + return edgeServer.database(); + } ); + } + + for ( int i = 0; i < edgeServers.size(); i++ ) + { + ecs.take().get(); + } + } + + private void createEdgeServers( int noOfEdgeServers, + final List addresses, + Map extraParams, + Map> instanceExtraParams, + String recordFormat ) + { + + for ( int i = 0; i < noOfEdgeServers; i++ ) + { + edgeServers.put( i, new EdgeServer( parentDir, i, discoveryServiceFactory, addresses, + extraParams, instanceExtraParams, recordFormat ) ); + } + } + + private void shutdownEdgeServers() + { + edgeServers.values().forEach( EdgeServer::shutdown ); + } + } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreServer.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreServer.java new file mode 100644 index 0000000000000..f2e93fe009c7a --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreServer.java @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.discovery; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.function.IntFunction; + +import org.neo4j.coreedge.raft.RaftInstance; +import org.neo4j.coreedge.raft.log.segmented.FileNames; +import org.neo4j.coreedge.raft.state.CoreState; +import org.neo4j.coreedge.server.AdvertisedSocketAddress; +import org.neo4j.coreedge.server.CoreEdgeClusterSettings; +import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.coreedge.server.core.CoreGraphDatabase; +import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.kernel.GraphDatabaseDependencies; +import org.neo4j.logging.Level; + +import static java.util.stream.Collectors.joining; + +import static org.neo4j.coreedge.raft.log.segmented.SegmentedRaftLog.SEGMENTED_LOG_DIRECTORY_NAME; +import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME; +import static org.neo4j.helpers.collection.MapUtil.stringMap; + +public class CoreServer +{ + private final File neo4jHome; + private final DiscoveryServiceFactory discoveryServiceFactory; + private final File storeDir; + private final Map config; + private final int serverId; + private CoreGraphDatabase database; + + public static final String CLUSTER_NAME = "core-neo4j"; + + public CoreServer( int serverId, int clusterSize, + List addresses, + DiscoveryServiceFactory discoveryServiceFactory, + String recordFormat, + File parentDir, + Map extraParams, + Map> instanceExtraParams) + { + this.serverId = serverId; + int clusterPort = 5000 + serverId; + int txPort = 6000 + serverId; + int raftPort = 7000 + serverId; + int boltPort = 8000 + serverId; + + String initialMembers = addresses.stream().map( AdvertisedSocketAddress::toString ).collect( joining( "," ) ); + + Map params = stringMap(); + params.put( "dbms.mode", "CORE" ); + params.put( GraphDatabaseSettings.store_internal_log_level.name(), Level.DEBUG.name() ); + params.put( CoreEdgeClusterSettings.cluster_name.name(), CLUSTER_NAME ); + params.put( CoreEdgeClusterSettings.initial_core_cluster_members.name(), initialMembers ); + params.put( GraphDatabaseSettings.record_format.name(), recordFormat ); + params.put( CoreEdgeClusterSettings.cluster_listen_address.name(), "localhost:" + clusterPort ); + params.put( CoreEdgeClusterSettings.transaction_advertised_address.name(), "localhost:" + txPort ); + params.put( CoreEdgeClusterSettings.transaction_listen_address.name(), "127.0.0.1:" + txPort ); + params.put( CoreEdgeClusterSettings.raft_advertised_address.name(), "localhost:" + raftPort ); + params.put( CoreEdgeClusterSettings.raft_listen_address.name(), "127.0.0.1:" + raftPort ); + params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); + params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); + params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).address.name(), "0.0.0.0:" + boltPort ); + params.put( GraphDatabaseSettings.bolt_advertised_address.name(), "127.0.0.1:" + boltPort ); + params.put( CoreEdgeClusterSettings.expected_core_cluster_size.name(), String.valueOf( clusterSize ) ); + params.put( GraphDatabaseSettings.pagecache_memory.name(), "8m" ); + params.put( GraphDatabaseSettings.auth_store.name(), new File( parentDir, "auth" ).getAbsolutePath() ); + params.putAll( extraParams ); + + for ( Map.Entry> entry : instanceExtraParams.entrySet() ) + { + params.put( entry.getKey(), entry.getValue().apply( serverId ) ); + } + + this.neo4jHome = new File( parentDir, "server-core-" + serverId ); + + params.put( GraphDatabaseSettings.logs_directory.name(), new File(neo4jHome, "logs").getAbsolutePath() ); + + this.config = params; + this.discoveryServiceFactory = discoveryServiceFactory; + storeDir = new File( new File( new File( neo4jHome, "data" ), "databases" ), "graph.db" ); + storeDir.mkdirs(); + } + + public void start() + { + database = new CoreGraphDatabase( storeDir, config, + GraphDatabaseDependencies.newDependencies(), discoveryServiceFactory ); + } + + public void shutdown() + { + if ( database != null ) + { + database.shutdown(); + database = null; + } + } + + public CoreGraphDatabase database() + { + return database; + } + + public File storeDir() + { + return storeDir; + } + + public CoreState coreState() + { + return database.getDependencyResolver().resolveDependency( CoreState.class ); + } + + public CoreMember id() + { + return database.getDependencyResolver().resolveDependency( RaftInstance.class ).identity(); + } + + public SortedMap getLogFileNames( ) + { + File clusterStateDir = new File( storeDir, CLUSTER_STATE_DIRECTORY_NAME ); + File logFilesDir = new File( clusterStateDir, SEGMENTED_LOG_DIRECTORY_NAME ); + return new FileNames( logFilesDir ).getAllFiles( new DefaultFileSystemAbstraction(), null ); + } + + public File homeDir() + { + return neo4jHome; + } + + @Override + public String toString() + { + return "CoreServer{" + + "serverId=" + serverId + + '}'; + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeServer.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeServer.java new file mode 100644 index 0000000000000..95606bcbafc19 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeServer.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.discovery; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.function.IntFunction; + +import org.neo4j.coreedge.server.AdvertisedSocketAddress; +import org.neo4j.coreedge.server.CoreEdgeClusterSettings; +import org.neo4j.coreedge.server.edge.EdgeGraphDatabase; +import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.kernel.GraphDatabaseDependencies; +import org.neo4j.logging.Level; + +import static java.util.stream.Collectors.joining; + +import static org.neo4j.helpers.collection.MapUtil.stringMap; + +public class EdgeServer +{ + private final File neo4jHome; + private final Map config; + private final DiscoveryServiceFactory discoveryServiceFactory; + private final File storeDir; + private EdgeGraphDatabase database; + + public EdgeServer( File parentDir, int serverId, DiscoveryServiceFactory discoveryServiceFactory, + List addresses, + Map extraParams, + Map> instanceExtraParams, + String recordFormat ) + { + String initialHosts = addresses.stream().map( AdvertisedSocketAddress::toString ).collect( joining( "," ) ); + + Map params = stringMap(); + params.put( "dbms.mode", "EDGE" ); + params.put( GraphDatabaseSettings.store_internal_log_level.name(), Level.DEBUG.name() ); + params.put( CoreEdgeClusterSettings.cluster_name.name(), CoreServer.CLUSTER_NAME ); + params.put( CoreEdgeClusterSettings.initial_core_cluster_members.name(), initialHosts ); + params.put( GraphDatabaseSettings.record_format.name(), recordFormat ); + params.put( GraphDatabaseSettings.pagecache_memory.name(), "8m" ); + params.put( GraphDatabaseSettings.auth_store.name(), new File( parentDir, "auth" ).getAbsolutePath() ); + params.putAll( extraParams ); + + for ( Map.Entry> entry : instanceExtraParams.entrySet() ) + { + params.put( entry.getKey(), entry.getValue().apply( serverId ) ); + } + + params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); + params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); + params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).address.name(), "0.0.0.0:" + (9000 + serverId) ); + params.put( GraphDatabaseSettings.bolt_advertised_address.name(), "127.0.0.1:" + (9000 + serverId) ); + + File neo4jHome = new File( parentDir, "server-edge-" + serverId ); + params.put( GraphDatabaseSettings.logs_directory.name(), new File( neo4jHome, "logs" ).getAbsolutePath() ); + + this.neo4jHome = neo4jHome; + this.config = params; + this.discoveryServiceFactory = discoveryServiceFactory; + storeDir = new File( new File( new File( neo4jHome, "data" ), "databases" ), "graph.db" ); + storeDir.mkdirs(); + } + + public void start() + { + database = new EdgeGraphDatabase( storeDir, config, + GraphDatabaseDependencies.newDependencies(), discoveryServiceFactory ); + } + + public void shutdown() + { + if ( database != null ) + { + database.shutdown(); + } + database = null; + } + + public EdgeGraphDatabase database() + { + return database; + } + + public File storeDir() + { + return storeDir; + } + +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java index d09751033c4d6..7b3e719f24e74 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; +import org.neo4j.coreedge.raft.RaftServer; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.state.RaftState; @@ -34,6 +35,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + import static org.neo4j.coreedge.raft.ReplicatedInteger.valueOf; import static org.neo4j.coreedge.server.RaftTestMember.member; @@ -44,7 +47,7 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, 500, - NullLogProvider.getInstance() ); + mock(RaftServer.class), NullLogProvider.getInstance() ); InMemoryRaftLog raftLog = new InMemoryRaftLog(); raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) ); @@ -66,7 +69,7 @@ public void shouldTimeoutIfCaughtUpButNotMember() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, 1, - NullLogProvider.getInstance()); + mock(RaftServer.class), NullLogProvider.getInstance()); RaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 1 ) ) @@ -93,7 +96,7 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, 1, - NullLogProvider.getInstance() ); + mock(RaftServer.class), NullLogProvider.getInstance() ); RaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 0 ), member( 1 ) ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java index c9d141180ffda..747b2c6412931 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java @@ -32,6 +32,8 @@ import org.neo4j.coreedge.raft.outcome.TruncateLogCommand; import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.logging.NullLog; +import org.neo4j.logging.NullLogProvider; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -71,7 +73,7 @@ public void shouldPerformTruncation() throws Exception localTermForAllEntries, new RaftLogEntry[]{ new RaftLogEntry( localTermForAllEntries + 1, ReplicatedInteger.valueOf( 2 ) )}, - appendIndex + 3 ) ); + appendIndex + 3 ), NullLog.getInstance() ); // then // we must produce a TruncateLogCommand at the earliest mismatching index @@ -102,7 +104,7 @@ public void shouldNotAllowTruncationAtCommit() throws Exception localTermForAllEntries, new RaftLogEntry[]{ new RaftLogEntry( localTermForAllEntries + 1, ReplicatedInteger.valueOf( 2 ) )}, - commitIndex + 3 ) ); + commitIndex + 3 ), NullLog.getInstance() ); fail( "Appending should not allow truncation at or before the commit index" ); } catch ( IllegalStateException expected ) @@ -135,7 +137,7 @@ public void shouldNotAllowTruncationBeforeCommit() throws Exception localTermForAllEntries, new RaftLogEntry[]{ new RaftLogEntry( localTermForAllEntries + 1, ReplicatedInteger.valueOf( 2 ) )}, - commitIndex + 3 ) ); + commitIndex + 3 ), NullLog.getInstance() ); fail( "Appending should not allow truncation at or before the commit index" ); } catch ( IllegalStateException expected ) @@ -173,7 +175,7 @@ public void shouldNotAttemptToTruncateAtIndexBeforeTheLogPrevIndex() throws Exce prevTerm, new RaftLogEntry[]{ new RaftLogEntry( prevTerm, ReplicatedInteger.valueOf( 2 ) )}, - commitIndex + 3 ) ); + commitIndex + 3 ), NullLog.getInstance() ); // then // there should be no truncate commands. Actually, the whole thing should be a no op diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterDiscoveryIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterDiscoveryIT.java index cc0a3f18d38e8..acda76d7f5df8 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterDiscoveryIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterDiscoveryIT.java @@ -62,7 +62,7 @@ public void shouldDiscoverCoreClusterMembers() throws Exception List currentMembers; for ( int i = 0; i < 3; i++ ) { - currentMembers = discoverClusterMembers( cluster.getCoreServerById( i ) ); + currentMembers = discoverClusterMembers( cluster.getCoreServerById( i ).database() ); assertThat( currentMembers, containsInAnyOrder( new Object[]{"127.0.0.1:8000"}, new Object[]{"127.0.0.1:8001"}, @@ -81,7 +81,7 @@ public void shouldFindReadAndWriteServers() throws Exception List currentMembers; for ( int i = 0; i < 3; i++ ) { - currentMembers = endPoints( cluster.getCoreServerById( i ) ); + currentMembers = endPoints( cluster.getCoreServerById( i ).database() ); assertEquals(1, currentMembers.stream().filter( x -> x[1].equals( "write" ) ).count()); assertEquals(1, currentMembers.stream().filter( x -> x[1].equals( "read" ) ).count()); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterFormationIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterFormationIT.java index 225da0bd66d89..66e4f459e0f50 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterFormationIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterFormationIT.java @@ -53,8 +53,8 @@ public void setup() throws Exception public void shouldBeAbleToAddAndRemoveCoreServers() throws Exception { // when - cluster.removeCoreServerWithServerId( 0 ); - cluster.addCoreServerWithServerId( 0, 3 ); + cluster.getCoreServerById( 0 ).shutdown(); + cluster.getCoreServerById( 0 ).start(); // then assertEquals( 3, cluster.numberOfCoreServers() ); @@ -66,7 +66,7 @@ public void shouldBeAbleToAddAndRemoveCoreServers() throws Exception assertEquals( 2, cluster.numberOfCoreServers() ); // when - cluster.addCoreServerWithServerId( 4, 3 ); + cluster.addCoreServerWithServerId( 4, 3 ).start(); // then assertEquals( 3, cluster.numberOfCoreServers() ); @@ -78,7 +78,7 @@ public void shouldBeAbleToAddAndRemoveCoreServersUnderModestLoad() throws Except // given ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit( () -> { - CoreGraphDatabase leader = cluster.getDbWithRole( Role.LEADER ); + CoreGraphDatabase leader = cluster.getDbWithRole( Role.LEADER ).database(); try ( Transaction tx = leader.beginTx() ) { leader.createNode(); @@ -87,20 +87,20 @@ public void shouldBeAbleToAddAndRemoveCoreServersUnderModestLoad() throws Except } ); // when - cluster.removeCoreServerWithServerId( 0 ); - cluster.addCoreServerWithServerId( 0, 3 ); + cluster.getCoreServerById( 0 ).shutdown(); + cluster.getCoreServerById( 0 ).start(); // then assertEquals( 3, cluster.numberOfCoreServers() ); // when - cluster.removeCoreServerWithServerId( 1 ); + cluster.removeCoreServerWithServerId( 0 ); // then assertEquals( 2, cluster.numberOfCoreServers() ); // when - cluster.addCoreServerWithServerId( 4, 3 ); + cluster.addCoreServerWithServerId( 4, 3 ).start(); // then assertEquals( 3, cluster.numberOfCoreServers() ); @@ -123,7 +123,8 @@ public void shouldBeAbleToRestartTheCluster() throws Exception // when cluster.removeCoreServerWithServerId( 1 ); - cluster.addCoreServerWithServerId( 3, 3 ); + + cluster.addCoreServerWithServerId( 3, 3 ).start(); cluster.shutdown(); cluster.start(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterOverviewIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterOverviewIT.java index 52f3edff78e29..a612d95b27499 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterOverviewIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterOverviewIT.java @@ -59,7 +59,7 @@ public void shouldDiscoverCoreClusterMembers() throws Exception List overview; for ( int i = 0; i < 3; i++ ) { - overview = clusterOverview( cluster.getCoreServerById( i ) ); + overview = clusterOverview( cluster.getCoreServerById( i ).database() ); assertThat( overview, containsRole( "leader", 1 ) ); assertThat( overview, containsRole( "follower", 2 ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterShutdownIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterShutdownIT.java index 1623439a42e00..bc44a4189dbed 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterShutdownIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterShutdownIT.java @@ -65,8 +65,9 @@ public void shouldShutdownEvenThoughWaitingForLock() throws Exception for ( int victimId = 0; victimId < cluster.numberOfCoreServers(); victimId++ ) { - assertTrue( cluster.getCoreServerById( victimId ).isAvailable( 1000 ) ); + assertTrue( cluster.getCoreServerById( victimId ).database().isAvailable( 1000 ) ); shouldShutdownEvenThoughWaitingForLock0( cluster, victimId, shutdownOrder ); + cluster.start(); } } @@ -86,7 +87,7 @@ private void shouldShutdownEvenThoughWaitingForLock0( Cluster cluster, int victi { // when - blocking on lock acquiring final AtomicReference someNode = new AtomicReference<>(); - final GraphDatabaseService victimDB = cluster.getCoreServerById( victimId ); + final GraphDatabaseService victimDB = cluster.getCoreServerById( victimId ).database(); try ( Transaction tx = victimDB.beginTx() ) { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ConvertNonCoreEdgeStoreIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ConvertNonCoreEdgeStoreIT.java index 72334fb6997af..57c0fbcd4e200 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ConvertNonCoreEdgeStoreIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ConvertNonCoreEdgeStoreIT.java @@ -19,20 +19,17 @@ */ package org.neo4j.coreedge.scenarios; +import java.io.File; +import java.util.Arrays; +import java.util.Collection; + import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -import org.neo4j.coreedge.convert.ConversionVerifier; -import org.neo4j.coreedge.convert.ConvertClassicStoreToCoreCommand; -import org.neo4j.coreedge.convert.GenerateClusterSeedCommand; import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.coreedge.discovery.CoreServer; import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.function.ThrowingSupplier; import org.neo4j.graphdb.GraphDatabaseService; @@ -42,26 +39,23 @@ import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.factory.GraphDatabaseFactory; import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.store.format.highlimit.HighLimit; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.restore.RestoreClusterCliTest; import org.neo4j.restore.RestoreClusterUtils; -import org.neo4j.restore.RestoreDatabaseCommand; import org.neo4j.restore.RestoreExistingClusterCli; import org.neo4j.restore.RestoreNewClusterCli; import org.neo4j.test.coreedge.ClusterRule; import static java.util.concurrent.TimeUnit.SECONDS; + import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; -import static org.neo4j.coreedge.discovery.Cluster.coreServerStoreDirectory; + import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_advertised_address; -import static org.neo4j.dbms.DatabaseManagementSystemSettings.database_path; import static org.neo4j.graphdb.Label.label; import static org.neo4j.helpers.collection.Iterables.count; -import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.restore.ArgsBuilder.args; import static org.neo4j.restore.ArgsBuilder.toArray; import static org.neo4j.test.assertion.Assert.assertEventually; @@ -93,9 +87,10 @@ public void shouldReplicateTransactionToCoreServers() throws Throwable File dbDir = clusterRule.testDirectory().cleanDirectory( "classic-db" ); File classicNeo4jStore = createClassicNeo4jStore( dbDir, 10, recordFormat ); - File clusterDirectory = clusterRule.clusterDirectory(); + Cluster cluster = this.clusterRule.withRecordFormat( recordFormat ).createCluster(); + + File homeDir = cluster.getCoreServerById( 0 ).homeDir(); - File homeDir = new File( clusterDirectory, "server-core-" + 0 ) ; StringBuilder output = RestoreClusterUtils.execute( () -> RestoreNewClusterCli.main( toArray( args() .homeDir( homeDir ).config( homeDir ).from( classicNeo4jStore ) .database( "graph.db" ).force().build() ) ) ); @@ -104,15 +99,15 @@ public void shouldReplicateTransactionToCoreServers() throws Throwable for ( int serverId = 1; serverId < CLUSTER_SIZE; serverId++ ) { - File destination = new File( clusterDirectory, "server-core-" + serverId ) ; + File destination = cluster.getCoreServerById( serverId ).homeDir(); RestoreClusterUtils.execute( () -> RestoreExistingClusterCli.main( toArray( args().homeDir( destination ) .config( destination ).from( classicNeo4jStore ).database( "graph.db" ).seed( seed ).force().build() ) ) ); } - Cluster cluster = clusterRule.withRecordFormat( recordFormat ).startCluster(); + cluster.start(); // when - GraphDatabaseService coreDB = cluster.awaitLeader( 5000 ); + CoreGraphDatabase coreDB = cluster.awaitLeader( 5000 ).database(); try ( Transaction tx = coreDB.beginTx() ) { @@ -121,11 +116,13 @@ public void shouldReplicateTransactionToCoreServers() throws Throwable tx.success(); } - cluster.addEdgeServerWithFileLocation( 4, recordFormat ); + cluster.addEdgeServerWithIdAndRecordFormat( 4, recordFormat ).start(); // then - for ( final CoreGraphDatabase db : cluster.coreServers() ) + for ( final CoreServer server : cluster.coreServers() ) { + CoreGraphDatabase db = server.database(); + try ( Transaction tx = db.beginTx() ) { ThrowingSupplier nodeCount = () -> count( db.getAllNodes() ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreEdgeRolesIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreEdgeRolesIT.java index 7e86ba32ec16a..8b91e3f15d140 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreEdgeRolesIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreEdgeRolesIT.java @@ -44,7 +44,7 @@ public void edgeServersShouldRefuseWrites() throws Exception { // given Cluster cluster = clusterRule.startCluster(); - GraphDatabaseService db = cluster.findAnEdgeServer(); + GraphDatabaseService db = cluster.findAnEdgeServer().database(); Transaction tx = db.beginTx(); db.createNode(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CorePruningIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CorePruningIT.java index df0c83ab35d45..ca3424e92b46b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CorePruningIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CorePruningIT.java @@ -26,8 +26,8 @@ import java.util.concurrent.TimeUnit; import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.coreedge.discovery.CoreServer; import org.neo4j.coreedge.server.CoreEdgeClusterSettings; -import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule; import org.neo4j.test.coreedge.ClusterRule; @@ -53,7 +53,7 @@ public void actuallyDeletesTheFiles() throws Exception // given Cluster cluster = clusterRule.startCluster(); - CoreGraphDatabase coreGraphDatabase = null; + CoreServer coreGraphDatabase = null; int txs = 10; for ( int i = 0; i < txs; i++ ) { @@ -64,7 +64,7 @@ public void actuallyDeletesTheFiles() throws Exception } // when pruning kicks in then some files are actually deleted - File storeDir = new File( coreGraphDatabase.getStoreDir() ); + File storeDir = coreGraphDatabase.storeDir(); int expectedNumberOfLogFilesAfterPruning = 2; assertEventually( "raft logs eventually pruned", () -> numberOfFiles( storeDir ), equalTo( expectedNumberOfLogFilesAfterPruning ), 1, TimeUnit.SECONDS ); @@ -77,7 +77,7 @@ public void shouldNotPruneUncommittedEntries() throws Exception // given Cluster cluster = clusterRule.startCluster(); - CoreGraphDatabase coreGraphDatabase = null; + CoreServer coreGraphDatabase = null; int txs = 1000; for ( int i = 0; i < txs; i++ ) { @@ -87,7 +87,7 @@ public void shouldNotPruneUncommittedEntries() throws Exception } // when pruning kicks in then some files are actually deleted - File storeDir = new File( coreGraphDatabase.getStoreDir() ); + File storeDir = coreGraphDatabase.storeDir(); int expectedNumberOfLogFilesAfterPruning = 2; assertEventually( "raft logs eventually pruned", () -> numberOfFiles( storeDir ), equalTo( expectedNumberOfLogFilesAfterPruning ), 1, TimeUnit.SECONDS ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerReplicationIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerReplicationIT.java index 0f4b37696d058..311700aca6f99 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerReplicationIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerReplicationIT.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeoutException; import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.coreedge.discovery.CoreServer; import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.function.Predicates; import org.neo4j.graphdb.Node; @@ -65,7 +66,7 @@ public void shouldReplicateTransactionsToCoreServers() throws Exception node.setProperty( "foobar", "baz_bat" ); tx.success(); } ); - CoreGraphDatabase last = cluster.coreTx( ( db, tx ) -> { + CoreServer last = cluster.coreTx( ( db, tx ) -> { db.schema().indexFor( label( "boo" ) ).on( "foobar" ).create(); tx.success(); } ); @@ -79,7 +80,7 @@ public void shouldReplicateTransactionsToCoreServers() throws Exception public void shouldReplicateTransactionToCoreServerAddedAfterInitialStartUp() throws Exception { // given - cluster.addCoreServerWithServerId( 3, 4 ); + cluster.addCoreServerWithServerId( 3, 4 ).start(); cluster.coreTx( ( db, tx ) -> { Node node = db.createNode(); @@ -88,8 +89,8 @@ public void shouldReplicateTransactionToCoreServerAddedAfterInitialStartUp() thr } ); // when - cluster.addCoreServerWithServerId( 4, 5 ); - CoreGraphDatabase last = cluster.coreTx( ( db, tx ) -> { + cluster.addCoreServerWithServerId( 4, 5 ).start(); + CoreServer last = cluster.coreTx( ( db, tx ) -> { Node node = db.createNode(); node.setProperty( "foobar", "baz_bat" ); tx.success(); @@ -112,7 +113,7 @@ public void shouldReplicateTransactionAfterLeaderWasRemovedFromCluster() throws // when cluster.removeCoreServer( cluster.awaitLeader() ); - CoreGraphDatabase last = cluster.coreTx( ( db, tx ) -> { + CoreServer last = cluster.coreTx( ( db, tx ) -> { Node node = db.createNode(); node.setProperty( "foobar", "baz_bat" ); tx.success(); @@ -127,7 +128,7 @@ public void shouldReplicateTransactionAfterLeaderWasRemovedFromCluster() throws public void shouldReplicateToCoreServersAddedAfterInitialTransactions() throws Exception { // when - CoreGraphDatabase last = null; + CoreServer last = null; for ( int i = 0; i < 15; i++ ) { last = cluster.coreTx( ( db, tx ) -> { @@ -137,16 +138,17 @@ public void shouldReplicateToCoreServersAddedAfterInitialTransactions() throws E } ); } - cluster.addCoreServerWithServerId( 3, 4 ); - cluster.addCoreServerWithServerId( 4, 5 ); + cluster.addCoreServerWithServerId( 3, 4 ).start(); + cluster.addCoreServerWithServerId( 4, 5 ).start(); // then assertEquals( 15, countNodes( last ) ); dataMatchesEventually( last, cluster.coreServers() ); } - private long countNodes( CoreGraphDatabase db ) + private long countNodes( CoreServer server ) { + CoreGraphDatabase db = server.database(); long count; try ( Transaction tx = db.beginTx() ) { @@ -156,13 +158,14 @@ private long countNodes( CoreGraphDatabase db ) return count; } - private void dataMatchesEventually( CoreGraphDatabase sourceDB, Collection targetDBs ) throws + private void dataMatchesEventually( CoreServer server, Collection targetDBs ) throws TimeoutException, InterruptedException { + CoreGraphDatabase sourceDB = server.database(); DbRepresentation sourceRepresentation = DbRepresentation.of( sourceDB ); - for ( CoreGraphDatabase targetDB : targetDBs ) + for ( CoreServer targetDB : targetDBs ) { - Predicates.await( () -> sourceRepresentation.equals( DbRepresentation.of( targetDB ) ), + Predicates.await( () -> sourceRepresentation.equals( DbRepresentation.of( targetDB.database() ) ), DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java index 6c4c43344d126..3c8312335ea17 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java @@ -22,33 +22,28 @@ import org.junit.Rule; import org.junit.Test; -import java.io.File; import java.util.Map; -import java.util.SortedMap; import java.util.concurrent.TimeoutException; import org.neo4j.coreedge.discovery.Cluster; -import org.neo4j.coreedge.raft.log.segmented.FileNames; +import org.neo4j.coreedge.discovery.CoreServer; +import org.neo4j.coreedge.discovery.HazelcastDiscoveryServiceFactory; import org.neo4j.coreedge.raft.roles.Role; -import org.neo4j.coreedge.raft.state.CoreState; import org.neo4j.coreedge.server.CoreEdgeClusterSettings; import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Relationship; import org.neo4j.graphdb.RelationshipType; -import org.neo4j.io.fs.DefaultFileSystemAbstraction; -import org.neo4j.kernel.configuration.Config; import org.neo4j.test.DbRepresentation; import org.neo4j.test.coreedge.ClusterRule; import static junit.framework.TestCase.assertEquals; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertThat; -import static org.neo4j.coreedge.raft.log.segmented.SegmentedRaftLog.SEGMENTED_LOG_DIRECTORY_NAME; + import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_log_pruning_frequency; import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_log_pruning_strategy; -import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME; import static org.neo4j.helpers.collection.MapUtil.stringMap; public class CoreToCoreCopySnapshotIT @@ -66,14 +61,14 @@ public void shouldBeAbleToDownloadFreshEmptySnapshot() throws Exception // given Cluster cluster = clusterRule.startCluster(); - CoreGraphDatabase leader = cluster.awaitLeader( TIMEOUT_MS ); + CoreServer leader = cluster.awaitLeader( TIMEOUT_MS ); // when - CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER ); - getCoreState( follower ).downloadSnapshot( leader.id() ); + CoreServer follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER ); + follower.coreState().downloadSnapshot( leader.id() ); // then - assertEquals( DbRepresentation.of( leader ), DbRepresentation.of( follower ) ); + assertEquals( DbRepresentation.of( leader.database() ), DbRepresentation.of( follower.database() ) ); } @Test @@ -82,19 +77,19 @@ public void shouldBeAbleToDownloadSmallFreshSnapshot() throws Exception // given Cluster cluster = clusterRule.startCluster(); - CoreGraphDatabase source = cluster.coreTx( ( db, tx ) -> { + CoreServer source = cluster.coreTx( ( db, tx ) -> { Node node = db.createNode(); node.setProperty( "hej", "svej" ); tx.success(); } ); // when - CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER ); + CoreServer follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER ); - getCoreState( follower ).downloadSnapshot( source.id() ); + follower.coreState().downloadSnapshot( source.id() ); // then - assertEquals( DbRepresentation.of( source ), DbRepresentation.of( follower ) ); + assertEquals( DbRepresentation.of( source.database() ), DbRepresentation.of( follower.database() ) ); } @Test @@ -103,17 +98,17 @@ public void shouldBeAbleToDownloadLargerFreshSnapshot() throws Exception // given Cluster cluster = clusterRule.startCluster(); - CoreGraphDatabase source = cluster.coreTx( ( db, tx ) -> { + CoreServer source = cluster.coreTx( ( db, tx ) -> { createData( db, 1000 ); tx.success(); } ); // when - CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER ); - getCoreState( follower ).downloadSnapshot( source.id() ); + CoreServer follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER ); + follower.coreState().downloadSnapshot( source.id() ); // then - assertEquals( DbRepresentation.of( source ), DbRepresentation.of( follower ) ); + assertEquals( DbRepresentation.of( source.database() ), DbRepresentation.of( follower.database() ) ); } @Test @@ -126,26 +121,26 @@ public void shouldBeAbleToDownloadToNewInstanceAfterPruning() throws Exception Cluster cluster = clusterRule.withSharedCoreParams( params ).startCluster(); - CoreGraphDatabase leader = cluster.coreTx( ( db, tx ) -> { + CoreServer leader = cluster.coreTx( ( db, tx ) -> { createData( db, 10000 ); tx.success(); } ); // when - for ( CoreGraphDatabase coreDb : cluster.coreServers() ) + for ( CoreServer coreDb : cluster.coreServers() ) { - getCoreState( coreDb ).compact(); + coreDb.coreState().compact(); } cluster.removeCoreServer( leader ); // to force a change of leader leader = cluster.awaitLeader(); int newDbId = 3; - cluster.addCoreServerWithServerId( newDbId, 3 ); - CoreGraphDatabase newDb = cluster.getCoreServerById( newDbId ); + cluster.addCoreServerWithServerId( newDbId, 3 ).start(); + CoreGraphDatabase newDb = cluster.getCoreServerById( newDbId ).database(); // then - assertEquals( DbRepresentation.of( leader ), DbRepresentation.of( newDb ) ); + assertEquals( DbRepresentation.of( leader.database() ), DbRepresentation.of( newDb ) ); } @Test @@ -160,7 +155,7 @@ public void shouldBeAbleToDownloadToRejoinedInstanceAfterPruning() throws Except //Start the cluster and accumulate some log files. Cluster cluster = clusterRule.withSharedCoreParams( coreParams ).startCluster(); - CoreGraphDatabase leader = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.LEADER ); + CoreServer leader = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.LEADER ); int followersLastLog = getMostRecentLogIdOn( leader ); while ( followersLastLog < 2 ) { @@ -168,9 +163,8 @@ public void shouldBeAbleToDownloadToRejoinedInstanceAfterPruning() throws Except followersLastLog = getMostRecentLogIdOn( leader ); } - CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER ); + CoreServer follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER ); follower.shutdown(); - Config config = follower.getDependencyResolver().resolveDependency( Config.class ); /** * After a follower is shutdown, wait until we accumulate some logs such that the oldest log is older than @@ -189,7 +183,7 @@ public void shouldBeAbleToDownloadToRejoinedInstanceAfterPruning() throws Except //then assertThat( leadersOldestLog, greaterThan( followersLastLog ) ); //The cluster member should join. Otherwise this line will hang forever. - cluster.addCoreServerWithServerId( cluster.serverIdFor( follower ), 3 ); + follower.start(); } static void createData( GraphDatabaseService db, int size ) @@ -207,21 +201,14 @@ static void createData( GraphDatabaseService db, int size ) } } - private int getOldestLogIdOn( CoreGraphDatabase clusterMember ) throws TimeoutException - { - return getLogFileNames( clusterMember ).firstKey().intValue(); - } - - private int getMostRecentLogIdOn( CoreGraphDatabase clusterMember ) throws TimeoutException + private int getOldestLogIdOn( CoreServer clusterMember ) throws TimeoutException { - return getLogFileNames( clusterMember ).lastKey().intValue(); + return clusterMember.getLogFileNames().firstKey().intValue(); } - private SortedMap getLogFileNames( CoreGraphDatabase clusterMember ) + private int getMostRecentLogIdOn( CoreServer clusterMember ) throws TimeoutException { - File clusterDir = new File( clusterMember.getStoreDir(), CLUSTER_STATE_DIRECTORY_NAME ); - File logFilesDir = new File( clusterDir, SEGMENTED_LOG_DIRECTORY_NAME ); - return new FileNames( logFilesDir ).getAllFiles( new DefaultFileSystemAbstraction(), null ); + return clusterMember.getLogFileNames().lastKey().intValue(); } private void doSomeTransactions( Cluster cluster, int count ) @@ -254,8 +241,4 @@ private String string( int numberOfCharacters ) return s.toString(); } - private CoreState getCoreState( CoreGraphDatabase follower ) - { - return follower.getDependencyResolver().resolveDependency( CoreState.class ); - } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java index dc04ba6914d61..4b51ec1761cb2 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java @@ -19,16 +19,20 @@ */ package org.neo4j.coreedge.scenarios; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import java.io.File; +import java.io.IOException; import java.util.Map; import java.util.SortedMap; import java.util.concurrent.TimeoutException; import java.util.function.BinaryOperator; import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.coreedge.discovery.CoreServer; +import org.neo4j.coreedge.discovery.EdgeServer; import org.neo4j.coreedge.raft.log.segmented.FileNames; import org.neo4j.coreedge.server.CoreEdgeClusterSettings; import org.neo4j.coreedge.server.core.CoreGraphDatabase; @@ -40,6 +44,10 @@ import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.TransactionFailureException; import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.io.fs.FileUtils; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.impl.pagecache.StandalonePageCacheFactory; +import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.format.highlimit.HighLimit; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.logging.Log; @@ -60,6 +68,7 @@ import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME; import static org.neo4j.helpers.collection.Iterables.count; import static org.neo4j.helpers.collection.MapUtil.stringMap; +import static org.neo4j.kernel.impl.store.MetaDataStore.Position.TIME; import static org.neo4j.test.assertion.Assert.assertEventually; public class EdgeServerReplicationIT @@ -75,7 +84,7 @@ public void shouldNotBeAbleToWriteToEdge() throws Exception // given Cluster cluster = clusterRule.startCluster(); - GraphDatabaseService edgeDB = cluster.findAnEdgeServer(); + EdgeGraphDatabase edgeDB = cluster.findAnEdgeServer().database(); // when (write should fail) boolean transactionFailed = false; @@ -102,9 +111,9 @@ public void allServersBecomeAvailable() throws Exception Cluster cluster = clusterRule.startCluster(); // then - for ( final EdgeGraphDatabase edgeGraphDatabase : cluster.edgeServers() ) + for ( final EdgeServer edgeServer : cluster.edgeServers() ) { - ThrowingSupplier availability = () -> edgeGraphDatabase.isAvailable( 0 ); + ThrowingSupplier availability = () -> edgeServer.database().isAvailable( 0 ); assertEventually( "edge server becomes available", availability, is( true ), 10, SECONDS ); } } @@ -125,7 +134,7 @@ public void shouldEventuallyPullTransactionDownToAllEdgeServers() throws Excepti } }, cluster ); - cluster.addEdgeServerWithId( 0 ); + cluster.addEdgeServerWithId( 0 ).start(); // when executeOnLeaderWithRetry( db -> { @@ -134,8 +143,9 @@ public void shouldEventuallyPullTransactionDownToAllEdgeServers() throws Excepti }, cluster ); // then - for ( final GraphDatabaseService edgeDB : cluster.edgeServers() ) + for ( final EdgeServer server : cluster.edgeServers() ) { + GraphDatabaseService edgeDB = server.database(); try ( Transaction tx = edgeDB.beginTx() ) { ThrowingSupplier nodeCount = () -> count( edgeDB.getAllNodes() ); @@ -153,10 +163,9 @@ public void shouldEventuallyPullTransactionDownToAllEdgeServers() throws Excepti } @Test + @Ignore("WIP: Turn this back on once Max/Davide have fixed the Edge Server StoreId stuff") public void shouldShutdownRatherThanPullUpdatesFromCoreServerWithDifferentStoreIfServerHasData() throws Exception { - File edgeDatabaseStoreFileLocation = clusterRule.testDirectory().directory( "edgeStore" ); - createExistingEdgeStore( edgeDatabaseStoreFileLocation ); Cluster cluster = clusterRule.withNumberOfEdgeServers( 0 ).startCluster(); executeOnLeaderWithRetry( db -> { @@ -167,17 +176,34 @@ public void shouldShutdownRatherThanPullUpdatesFromCoreServerWithDifferentStoreI } }, cluster ); + EdgeServer edgeServer = cluster.addEdgeServerWithId( 4 ); + putSomeDataWithDifferentStoreId(edgeServer.storeDir(), cluster.getCoreServerById( 0 ).storeDir()); try { - cluster.addEdgeServerWithFileLocation( edgeDatabaseStoreFileLocation ); - fail(); + edgeServer.start(); + fail("Should have failed to start"); } - catch ( Throwable required ) + catch ( RuntimeException required ) { // Lifecycle should throw exception, server should not start. } } + private void putSomeDataWithDifferentStoreId( File storeDir, File coreStoreDir ) throws IOException + { + FileUtils.copyRecursively( coreStoreDir, storeDir ); + changeStoreId( storeDir ); + } + + private void changeStoreId( File storeDir ) throws IOException + { + File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME ); + try ( PageCache pageCache = StandalonePageCacheFactory.createPageCache( new DefaultFileSystemAbstraction() ) ) + { + MetaDataStore.setRecord( pageCache, neoStoreFile, TIME, System.currentTimeMillis() ); + } + } + @Test public void shouldThrowExceptionIfEdgeRecordFormatDiffersToCoreRecordFormat() throws Exception { @@ -195,7 +221,7 @@ public void shouldThrowExceptionIfEdgeRecordFormatDiffersToCoreRecordFormat() th try { - cluster.addEdgeServerWithFileLocation( 0, StandardV3_0.NAME ); + cluster.addEdgeServerWithIdAndRecordFormat( 0, StandardV3_0.NAME ); } catch ( Exception e ) { @@ -229,9 +255,9 @@ public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception tx.success(); } ); - long baseVersion = versionBy( new File( cluster.awaitLeader().getStoreDir() ), Math::max ); + long baseVersion = versionBy( cluster.awaitLeader().storeDir(), Math::max ); - CoreGraphDatabase coreGraphDatabase = null; + CoreServer coreGraphDatabase = null; for ( int j = 0; j < 2; j++ ) { coreGraphDatabase = cluster.coreTx( ( db, tx ) -> { @@ -244,16 +270,16 @@ public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception } ); } - File storeDir = new File( coreGraphDatabase.getStoreDir() ); + File storeDir = coreGraphDatabase.storeDir(); assertEventually( "pruning happened", () -> versionBy( storeDir, Math::min ), greaterThan( baseVersion ), 1, SECONDS ); // when - cluster.addEdgeServerWithFileLocation( 42, HighLimit.NAME ); + cluster.addEdgeServerWithIdAndRecordFormat( 42, HighLimit.NAME ).start(); // then - for ( final EdgeGraphDatabase edge : cluster.edgeServers() ) + for ( final EdgeServer edge : cluster.edgeServers() ) { - assertEventually( "edge server available", () -> edge.isAvailable( 0 ), is( true ), 10, SECONDS ); + assertEventually( "edge server available", () -> edge.database().isAvailable( 0 ), is( true ), 10, SECONDS ); } } @@ -265,26 +291,12 @@ private long versionBy( File storeDir, BinaryOperator operator ) return logs.keySet().stream().reduce( operator ).orElseThrow( IllegalStateException::new ); } - private void createExistingEdgeStore( File dir ) - { - GraphDatabaseService db = - new TestGraphDatabaseFactory().newEmbeddedDatabase( Cluster.edgeServerStoreDirectory( dir, 1966 ) ); - - try ( Transaction tx = db.beginTx() ) - { - db.createNode(); - tx.success(); - } - - db.shutdown(); - } - private GraphDatabaseService executeOnLeaderWithRetry( Workload workload, Cluster cluster ) throws TimeoutException { CoreGraphDatabase coreDB; while ( true ) { - coreDB = cluster.awaitLeader( 5000 ); + coreDB = cluster.awaitLeader( 5000 ).database(); try ( Transaction tx = coreDB.beginTx() ) { workload.doWork( coreDB ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/HazelcastClientLifeCycleIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/HazelcastClientLifeCycleIT.java deleted file mode 100644 index 05f87aa9f0b95..0000000000000 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/HazelcastClientLifeCycleIT.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.scenarios; - -public class HazelcastClientLifeCycleIT -{ -// public final -// @Rule -// TargetDirectory.TestDirectory dir = TargetDirectory.testDirForTest( getClass() ); -// -// @Rule -// public ExpectedException exceptionMatcher = ExpectedException.none(); -// -// private Cluster cluster; -// -// @After -// public void shutdown() -// { -// if ( cluster != null ) -// { -// cluster.shutdown(); -// } -// } -// -// @Test -// public void shouldConnectToCoreClusterAsLongAsOneInitialHostIsAvailable() throws Throwable -// { -// // given -// cluster = start( dir.directory(), 2, 0 ); -// -// // when -// -// ListenSocketAddress goodHostnamePort = hostnamePort( cluster.coreServers().iterator().next() ); -// InetSocketAddress address = goodHostnamePort.socketAddress(); -// String badHost = "localhost:9999"; -// String goodHost = address.getHostString() + ":" + address.getPort(); -// -// HazelcastClientLifecycle client = new HazelcastClientLifecycle( getConfig( badHost + "," + goodHost )); -// -// // then -// assertEquals( 2, client.currentTopology().getNumberOfCoreServers() ); -// } -// -// @Test -// public void shouldConnectToCopeWithShutdownCore() throws Throwable -// { -// // given -// cluster = start( dir.directory(), 2, 0 ); -// -// // when -// -// ListenSocketAddress goodHostnamePort = hostnamePort( cluster.coreServers().iterator().next() ); -// InetSocketAddress address = goodHostnamePort.socketAddress(); -// String badHost = "localhost:9999"; -// String goodHost = address.getHostString() + ":" + address.getPort(); -// -// HazelcastClientLifecycle client = new HazelcastClientLifecycle( getConfig( badHost + "," + goodHost )); -// -// client.start(); -// -// cluster.shutdown(); -// -// // then -// assertEquals( 2, client.currentTopology().getNumberOfCoreServers() ); -// -// client.stop(); -// } -// -// private ListenSocketAddress hostnamePort( CoreGraphDatabase aCoreServer ) -// { -// return aCoreServer.getDependencyResolver() -// .resolveDependency( Config.class ).get( CoreEdgeClusterSettings.cluster_listen_address ); -// } -// -// @Test -// public void shouldThrowAnExceptionIfUnableToConnectToCoreCluster() throws Throwable -// { -// // when -// String badHost = "localhost:9999"; -// HazelcastClientLifecycle client = new HazelcastClientLifecycle( getConfig( badHost ) ); -// -// // then -// exceptionMatcher.expect( EdgeServerConnectionException.class ); -// -// client.start(); -// client.stop(); -// } -// -// -// private Config getConfig( String initialHosts ) -// { -// Map params = stringMap(); -// params.put( "dbms.mode", "CORE_EDGE" ); -// params.put( ClusterSettings.cluster_name.name(), Cluster.CLUSTER_NAME ); -// params.put( ClusterSettings.server_id.name(), String.valueOf( 99 ) ); -// params.put( CoreEdgeClusterSettings.initial_core_cluster_members.name(), initialHosts ); -// return new Config( params ); -// } -} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RecoveryIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RecoveryIT.java index 530ba949e2256..c2800980e2cc2 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RecoveryIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RecoveryIT.java @@ -23,18 +23,14 @@ import org.junit.Test; import java.io.File; -import java.util.ArrayList; import java.util.Set; -import java.util.stream.Collectors; import org.neo4j.consistency.ConsistencyCheckService; import org.neo4j.coreedge.discovery.Cluster; -import org.neo4j.coreedge.server.core.CoreGraphDatabase; +import org.neo4j.coreedge.discovery.CoreServer; import org.neo4j.graphdb.Node; -import org.neo4j.graphdb.Transaction; import org.neo4j.helpers.progress.ProgressMonitorFactory; import org.neo4j.kernel.configuration.Config; -import org.neo4j.logging.FormattedLogProvider; import org.neo4j.logging.NullLogProvider; import org.neo4j.test.DbRepresentation; import org.neo4j.test.coreedge.ClusterRule; @@ -60,8 +56,7 @@ public void shouldBeConsistentAfterShutdown() throws Exception fireSomeLoadAtTheCluster( cluster ); - Set storeDirs = cluster.coreServers().stream() - .map( CoreGraphDatabase::getStoreDir ).map( File::new ).collect( toSet() ); + Set storeDirs = cluster.coreServers().stream().map( CoreServer::storeDir ).collect( toSet() ); // when cluster.shutdown(); @@ -79,15 +74,14 @@ public void singleServerWithinClusterShouldBeConsistentAfterRestart() throws Exc fireSomeLoadAtTheCluster( cluster ); - Set storeDirs = cluster.coreServers().stream() - .map( CoreGraphDatabase::getStoreDir ).map( File::new ).collect( toSet() ); + Set storeDirs = cluster.coreServers().stream().map( CoreServer::storeDir ).collect( toSet() ); // when for ( int i = 0; i < clusterSize; i++ ) { cluster.removeCoreServerWithServerId( i ); fireSomeLoadAtTheCluster( cluster ); - cluster.addCoreServerWithServerId( i, clusterSize ); + cluster.addCoreServerWithServerId( i, clusterSize ).start(); } cluster.shutdown(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RestartIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RestartIT.java index 68fcb67a95af3..12e209fe8d12a 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RestartIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RestartIT.java @@ -19,19 +19,17 @@ */ package org.neo4j.coreedge.scenarios; -import java.io.File; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.neo4j.consistency.ConsistencyCheckService; import org.neo4j.coreedge.discovery.Cluster; -import org.neo4j.coreedge.server.core.CoreGraphDatabase; -import org.neo4j.coreedge.server.edge.EdgeGraphDatabase; +import org.neo4j.coreedge.discovery.CoreServer; +import org.neo4j.coreedge.discovery.EdgeServer; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Transaction; @@ -88,7 +86,7 @@ public void restartWhileDoingTransactions() throws Exception Cluster cluster = clusterRule.startCluster(); // when - final GraphDatabaseService coreDB = cluster.getCoreServerById( 0 ); + final GraphDatabaseService coreDB = cluster.getCoreServerById( 0 ).database(); ExecutorService executor = Executors.newCachedThreadPool(); @@ -126,7 +124,7 @@ public void edgeTest() throws Exception Cluster cluster = clusterRule.withNumberOfCoreServers( 2 ).withNumberOfEdgeServers( 1 ).startCluster(); // when - final GraphDatabaseService coreDB = cluster.awaitLeader( 5000 ); + final GraphDatabaseService coreDB = cluster.awaitLeader( 5000 ).database(); try ( Transaction tx = coreDB.beginTx() ) { @@ -135,21 +133,21 @@ public void edgeTest() throws Exception tx.success(); } - cluster.addCoreServerWithServerId( 2, 3 ); + cluster.addCoreServerWithServerId( 2, 3 ).start(); cluster.shutdown(); - for ( CoreGraphDatabase core : cluster.coreServers() ) + for ( CoreServer core : cluster.coreServers() ) { ConsistencyCheckService.Result result = new ConsistencyCheckService().runFullConsistencyCheck( - new File( core.getStoreDir() ),Config.defaults(), ProgressMonitorFactory.NONE, + core.storeDir(), Config.defaults(), ProgressMonitorFactory.NONE, FormattedLogProvider.toOutputStream( System.out ), new DefaultFileSystemAbstraction(), false ); assertTrue( "Inconsistent: " + core, result.isSuccessful() ); } - for ( EdgeGraphDatabase edge : cluster.edgeServers() ) + for ( EdgeServer edge : cluster.edgeServers() ) { ConsistencyCheckService.Result result = new ConsistencyCheckService().runFullConsistencyCheck( - new File( edge.getStoreDir() ), Config.defaults(), ProgressMonitorFactory.NONE, + edge.storeDir(), Config.defaults(), ProgressMonitorFactory.NONE, FormattedLogProvider.toOutputStream( System.out ), new DefaultFileSystemAbstraction(), false ); assertTrue( "Inconsistent: " + edge, result.isSuccessful() ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java b/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java index b57b97c76b90c..fb0d04b2224ce 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java @@ -100,14 +100,22 @@ protected void after() * cluster is up and all members report each other as available. */ public Cluster startCluster() throws Exception + { + createCluster(); + cluster.start(); + cluster.awaitLeader(); + return cluster; + } + + public Cluster createCluster() throws Exception { if ( cluster == null ) { cluster = new Cluster( clusterDirectory, noCoreServers, noEdgeServers, factory, coreParams, instanceCoreParams, edgeParams, instanceEdgeParams, recordFormat ); - cluster.start(); + } - cluster.awaitLeader(); + return cluster; } diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java index b31cfef5d7b8a..63825544e92a9 100644 --- a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java +++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java @@ -26,23 +26,20 @@ import java.io.File; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.coreedge.discovery.CoreServer; +import org.neo4j.coreedge.discovery.EdgeServer; import org.neo4j.coreedge.server.core.CoreGraphDatabase; -import org.neo4j.coreedge.server.edge.EdgeGraphDatabase; -import org.neo4j.cypher.internal.compiler.v3_1.planner.logical.Metrics; import org.neo4j.function.ThrowingSupplier; -import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Transaction; -import org.neo4j.helpers.collection.Iterables; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Settings; -import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.metrics.source.coreedge.CoreMetrics; import org.neo4j.test.coreedge.ClusterRule; -import org.neo4j.test.rule.TargetDirectory; import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.CoreMatchers.equalTo; @@ -94,7 +91,7 @@ public void shouldMonitorCoreEdge() throws Exception cluster = clusterRule.startCluster(); // when - CoreGraphDatabase coreDB = cluster.awaitLeader( 5000 ); + CoreGraphDatabase coreDB = cluster.awaitLeader( 5000 ).database(); try ( Transaction tx = coreDB.beginTx() ) { @@ -104,12 +101,17 @@ public void shouldMonitorCoreEdge() throws Exception } // then - for ( GraphDatabaseAPI db : concat( cluster.coreServers(), cluster.edgeServers() ) ) + for ( CoreServer db : cluster.coreServers() ) { - assertAllNodesVisible( db ); + assertAllNodesVisible( db.database() ); } - File coreServerMetricsDir = new File( cluster.getCoreServerById( 0 ).getStoreDir(), csvPath.getDefaultValue() ); + for ( EdgeServer db : cluster.edgeServers() ) + { + assertAllNodesVisible( db.database() ); + } + + File coreServerMetricsDir = new File( cluster.getCoreServerById( 0 ).storeDir(), csvPath.getDefaultValue() ); assertEventually( "append index eventually accurate", () -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.APPEND_INDEX ) ), @@ -129,7 +131,7 @@ public void shouldMonitorCoreEdge() throws Exception assertEventually( "tx pull requests received eventually accurate", () -> { long total = 0; - for ( final CoreGraphDatabase db : cluster.coreServers() ) + for ( final CoreGraphDatabase db : cluster.coreServers().stream().map( CoreServer::database ).collect( Collectors.toList()) ) { File metricsDir = new File( db.getStoreDir(), "metrics" ); total += readLongValue( metricsCsv( metricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) );