diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EdgeServerStartupProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EdgeServerStartupProcess.java index 11ff366025e01..be9bf9998b85e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EdgeServerStartupProcess.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EdgeServerStartupProcess.java @@ -19,6 +19,8 @@ */ package org.neo4j.coreedge.server.edge; +import java.util.concurrent.locks.LockSupport; + import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.catchup.storecopy.edge.StoreFetcher; import org.neo4j.coreedge.discovery.CoreServerSelectionException; @@ -26,6 +28,7 @@ import org.neo4j.coreedge.raft.replication.tx.RetryStrategy; import org.neo4j.coreedge.server.CoreMember; import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.store.MismatchingStoreIdException; import org.neo4j.kernel.impl.transaction.state.DataSourceManager; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.logging.Log; @@ -75,30 +78,42 @@ public void init() throws Throwable @Override public void start() throws Throwable { - boolean copiedStore = false; - do + dataSourceManager.start(); + + if ( localDatabase.isEmpty() ) + { + CoreMember transactionServer = findCoreMemberToCopyFrom(); + localDatabase.stop(); + localDatabase.copyStoreFrom( transactionServer, storeFetcher ); + localDatabase.start(); + } + else + { + throw new IllegalStateException( "Local database is not empty cannot copy store" ); + } + + txPulling.start(); + } + + private CoreMember findCoreMemberToCopyFrom() + { + while ( true ) { try { - CoreMember transactionServer = connectionStrategy.coreServer(); - log.info( "Server starting, connecting to core server at %s", transactionServer.toString() ); + CoreMember coreMember = connectionStrategy.coreServer(); + log.info( "Server starting, connecting to core server at %s", coreMember.toString() ); discoveryService.registerEdgeServer( extractBoltAddress( config ) ); - - localDatabase.copyStoreFrom( transactionServer, storeFetcher ); - copiedStore = true; + return coreMember; } catch ( CoreServerSelectionException ex ) { log.info( "Failed to connect to core server. Retrying in %d ms.", timeout.getMillis() ); - Thread.sleep( timeout.getMillis() ); + LockSupport.parkUntil( timeout.getMillis() + System.currentTimeMillis() ); timeout.increment(); } - - } while ( !copiedStore ); - - dataSourceManager.start(); - txPulling.start(); + } } @Override diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/EdgeServerStartupProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/EdgeServerStartupProcessTest.java index 9e93dd52cae94..6da26495b8604 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/EdgeServerStartupProcessTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/EdgeServerStartupProcessTest.java @@ -21,6 +21,7 @@ import org.junit.Test; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -41,25 +42,20 @@ import org.neo4j.logging.NullLogProvider; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static org.neo4j.helpers.collection.Iterators.asSet; public class EdgeServerStartupProcessTest { @Test - public void startShouldReplaceLocalStoreWithStoreFromCoreServerAndStartPolling() throws Throwable + public void startShouldReplaceTheEmptyLocalStoreWithStoreFromCoreServerAndStartPolling() throws Throwable { // given - final Map params = new HashMap<>(); - - params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); - params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" ); - params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).address.name(), "127.0.0.1:" + 8001 ); - - Config config = new Config( params ); - StoreFetcher storeFetcher = mock( StoreFetcher.class ); LocalDatabase localDatabase = mock( LocalDatabase.class ); @@ -78,15 +74,61 @@ public void startShouldReplaceLocalStoreWithStoreFromCoreServerAndStartPolling() EdgeServerStartupProcess edgeServerStartupProcess = new EdgeServerStartupProcess( storeFetcher, localDatabase, txPulling, dataSourceManager, new AlwaysChooseFirstServer( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ), NullLogProvider.getInstance(), - mock( EdgeTopologyService.class ), config ); + mock( EdgeTopologyService.class ), new Config( Collections.emptyMap() ) ); // when edgeServerStartupProcess.start(); // then - verify( localDatabase ).copyStoreFrom( coreMember, storeFetcher ); verify( dataSourceManager ).start(); + verify( localDatabase ).isEmpty(); + verify( localDatabase ).stop(); + verify( localDatabase ).copyStoreFrom( coreMember, storeFetcher ); + verify( localDatabase ).start(); verify( txPulling ).start(); + verifyNoMoreInteractions( localDatabase, dataSourceManager, txPulling ); + } + + @Test + public void startShouldNotReplaceTheNonEmptyLocalStoreWithStoreFromCoreServerAndStartPolling() throws Throwable + { + // given + StoreFetcher storeFetcher = mock( StoreFetcher.class ); + LocalDatabase localDatabase = mock( LocalDatabase.class ); + + CoreMember coreMember = new CoreMember( UUID.randomUUID() ); + TopologyService hazelcastTopology = mock( TopologyService.class ); + + ClusterTopology clusterTopology = mock( ClusterTopology.class ); + when( hazelcastTopology.currentTopology() ).thenReturn( clusterTopology ); + + when( clusterTopology.coreMembers() ).thenReturn( asSet( coreMember ) ); + when( localDatabase.isEmpty() ).thenReturn( false ); + + DataSourceManager dataSourceManager = mock( DataSourceManager.class ); + Lifecycle txPulling = mock( Lifecycle.class ); + + EdgeServerStartupProcess edgeServerStartupProcess = new EdgeServerStartupProcess( storeFetcher, localDatabase, + txPulling, dataSourceManager, new AlwaysChooseFirstServer( hazelcastTopology ), + new ConstantTimeRetryStrategy( 1, MILLISECONDS ), NullLogProvider.getInstance(), + mock( EdgeTopologyService.class ), new Config( Collections.emptyMap() ) ); + + // when + try + { + edgeServerStartupProcess.start(); + fail( "shoud have thrown" ); + } + catch ( IllegalStateException ex ) + { + // expected + } + + // then + verify( localDatabase ).isEmpty(); + verify( dataSourceManager ).start(); + verifyNoMoreInteractions( localDatabase, dataSourceManager ); + verifyZeroInteractions( txPulling ); } @Test @@ -117,5 +159,6 @@ txPulling, dataSourceManager, new AlwaysChooseFirstServer( hazelcastTopology ), // then verify( txPulling ).stop(); verify( dataSourceManager ).stop(); + verifyNoMoreInteractions( txPulling, dataSourceManager ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java index 4b51ec1761cb2..8f64d0c078e9e 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java @@ -19,7 +19,6 @@ */ package org.neo4j.coreedge.scenarios; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -50,15 +49,17 @@ import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.format.highlimit.HighLimit; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; +import org.neo4j.kernel.lifecycle.LifecycleException; import org.neo4j.logging.Log; -import org.neo4j.test.TestGraphDatabaseFactory; import org.neo4j.test.coreedge.ClusterRule; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -74,9 +75,8 @@ public class EdgeServerReplicationIT { @Rule - public final ClusterRule clusterRule = new ClusterRule( getClass() ) - .withNumberOfCoreServers( 3 ) - .withNumberOfEdgeServers( 1 ); + public final ClusterRule clusterRule = + new ClusterRule( getClass() ).withNumberOfCoreServers( 3 ).withNumberOfEdgeServers( 1 ); @Test public void shouldNotBeAbleToWriteToEdge() throws Exception @@ -145,7 +145,7 @@ public void shouldEventuallyPullTransactionDownToAllEdgeServers() throws Excepti // then for ( final EdgeServer server : cluster.edgeServers() ) { - GraphDatabaseService edgeDB = server.database(); + GraphDatabaseService edgeDB = server.database(); try ( Transaction tx = edgeDB.beginTx() ) { ThrowingSupplier nodeCount = () -> count( edgeDB.getAllNodes() ); @@ -163,7 +163,6 @@ public void shouldEventuallyPullTransactionDownToAllEdgeServers() throws Excepti } @Test - @Ignore("WIP: Turn this back on once Max/Davide have fixed the Edge Server StoreId stuff") public void shouldShutdownRatherThanPullUpdatesFromCoreServerWithDifferentStoreIfServerHasData() throws Exception { Cluster cluster = clusterRule.withNumberOfEdgeServers( 0 ).startCluster(); @@ -176,16 +175,21 @@ public void shouldShutdownRatherThanPullUpdatesFromCoreServerWithDifferentStoreI } }, cluster ); - EdgeServer edgeServer = cluster.addEdgeServerWithId( 4 ); - putSomeDataWithDifferentStoreId(edgeServer.storeDir(), cluster.getCoreServerById( 0 ).storeDir()); + EdgeServer edgeServer = cluster.addEdgeServerWithId( 4 ); + putSomeDataWithDifferentStoreId( edgeServer.storeDir(), cluster.getCoreServerById( 0 ).storeDir() ); + try { edgeServer.start(); - fail("Should have failed to start"); + fail( "Should have failed to start" ); } - catch ( RuntimeException required ) + catch ( LifecycleException required ) { // Lifecycle should throw exception, server should not start. + assertThat( required.getCause(), instanceOf( LifecycleException.class ) ); + assertThat( required.getCause().getCause(), instanceOf( IllegalStateException.class ) ); + assertThat( required.getCause().getCause().getMessage(), + equalTo( "Local database is not empty cannot copy store" ) ); } } @@ -225,8 +229,8 @@ public void shouldThrowExceptionIfEdgeRecordFormatDiffersToCoreRecordFormat() th } catch ( Exception e ) { - assertThat(e.getCause().getCause().getMessage(), - containsString("Failed to start database with copied store")); + assertThat( e.getCause().getCause().getMessage(), + containsString( "Failed to start database with copied store" ) ); } } @@ -234,17 +238,12 @@ public void shouldThrowExceptionIfEdgeRecordFormatDiffersToCoreRecordFormat() th public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception { // given - Map params = stringMap( - CoreEdgeClusterSettings.raft_log_rotation_size.name(), "1k", + Map params = stringMap( CoreEdgeClusterSettings.raft_log_rotation_size.name(), "1k", CoreEdgeClusterSettings.raft_log_pruning_frequency.name(), "500ms", CoreEdgeClusterSettings.state_machine_flush_window_size.name(), "1", - CoreEdgeClusterSettings.raft_log_pruning_strategy.name(), "1 entries" - ); - Cluster cluster = clusterRule - .withNumberOfEdgeServers( 0 ) - .withSharedCoreParams( params ) - .withRecordFormat( HighLimit.NAME ) - .startCluster(); + CoreEdgeClusterSettings.raft_log_pruning_strategy.name(), "1 entries" ); + Cluster cluster = clusterRule.withNumberOfEdgeServers( 0 ).withSharedCoreParams( params ) + .withRecordFormat( HighLimit.NAME ).startCluster(); cluster.coreTx( ( db, tx ) -> { Node node = db.createNode( Label.label( "L" ) ); @@ -271,7 +270,8 @@ public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception } File storeDir = coreGraphDatabase.storeDir(); - assertEventually( "pruning happened", () -> versionBy( storeDir, Math::min ), greaterThan( baseVersion ), 1, SECONDS ); + assertEventually( "pruning happened", () -> versionBy( storeDir, Math::min ), greaterThan( baseVersion ), 1, + SECONDS ); // when cluster.addEdgeServerWithIdAndRecordFormat( 42, HighLimit.NAME ).start(); @@ -279,7 +279,8 @@ public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception // then for ( final EdgeServer edge : cluster.edgeServers() ) { - assertEventually( "edge server available", () -> edge.database().isAvailable( 0 ), is( true ), 10, SECONDS ); + assertEventually( "edge server available", () -> edge.database().isAvailable( 0 ), is( true ), 10, + SECONDS ); } }