Skip to content

Commit

Permalink
Shutdown Edge server if the local database is not empty
Browse files Browse the repository at this point in the history
Avoid to copy the store and override the local data.
  • Loading branch information
davidegrohmann committed Jul 4, 2016
1 parent 7668c74 commit d8ab022
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 48 deletions.
Expand Up @@ -19,13 +19,16 @@
*/ */
package org.neo4j.coreedge.server.edge; package org.neo4j.coreedge.server.edge;


import java.util.concurrent.locks.LockSupport;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFetcher; import org.neo4j.coreedge.catchup.storecopy.edge.StoreFetcher;
import org.neo4j.coreedge.discovery.CoreServerSelectionException; import org.neo4j.coreedge.discovery.CoreServerSelectionException;
import org.neo4j.coreedge.discovery.EdgeTopologyService; import org.neo4j.coreedge.discovery.EdgeTopologyService;
import org.neo4j.coreedge.raft.replication.tx.RetryStrategy; import org.neo4j.coreedge.raft.replication.tx.RetryStrategy;
import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.MismatchingStoreIdException;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager; import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
Expand Down Expand Up @@ -75,30 +78,42 @@ public void init() throws Throwable
@Override @Override
public void start() throws Throwable public void start() throws Throwable
{ {
boolean copiedStore = false; dataSourceManager.start();
do
if ( localDatabase.isEmpty() )
{
CoreMember transactionServer = findCoreMemberToCopyFrom();
localDatabase.stop();
localDatabase.copyStoreFrom( transactionServer, storeFetcher );
localDatabase.start();
}
else
{
throw new IllegalStateException( "Local database is not empty cannot copy store" );
}

txPulling.start();
}

private CoreMember findCoreMemberToCopyFrom()
{
while ( true )
{ {
try try
{ {
CoreMember transactionServer = connectionStrategy.coreServer(); CoreMember coreMember = connectionStrategy.coreServer();
log.info( "Server starting, connecting to core server at %s", transactionServer.toString() ); log.info( "Server starting, connecting to core server at %s", coreMember.toString() );


discoveryService.registerEdgeServer( extractBoltAddress( config ) ); discoveryService.registerEdgeServer( extractBoltAddress( config ) );

return coreMember;
localDatabase.copyStoreFrom( transactionServer, storeFetcher );
copiedStore = true;
} }
catch ( CoreServerSelectionException ex ) catch ( CoreServerSelectionException ex )
{ {
log.info( "Failed to connect to core server. Retrying in %d ms.", timeout.getMillis() ); log.info( "Failed to connect to core server. Retrying in %d ms.", timeout.getMillis() );
Thread.sleep( timeout.getMillis() ); LockSupport.parkUntil( timeout.getMillis() + System.currentTimeMillis() );
timeout.increment(); timeout.increment();
} }

}
} while ( !copiedStore );

dataSourceManager.start();
txPulling.start();
} }


@Override @Override
Expand Down
Expand Up @@ -21,6 +21,7 @@


import org.junit.Test; import org.junit.Test;


import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
Expand All @@ -41,25 +42,20 @@
import org.neo4j.logging.NullLogProvider; import org.neo4j.logging.NullLogProvider;


import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.helpers.collection.Iterators.asSet; import static org.neo4j.helpers.collection.Iterators.asSet;


public class EdgeServerStartupProcessTest public class EdgeServerStartupProcessTest
{ {
@Test @Test
public void startShouldReplaceLocalStoreWithStoreFromCoreServerAndStartPolling() throws Throwable public void startShouldReplaceTheEmptyLocalStoreWithStoreFromCoreServerAndStartPolling() throws Throwable
{ {
// given // given
final Map<String, String> params = new HashMap<>();

params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" );
params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled.name(), "true" );
params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).address.name(), "127.0.0.1:" + 8001 );

Config config = new Config( params );

StoreFetcher storeFetcher = mock( StoreFetcher.class ); StoreFetcher storeFetcher = mock( StoreFetcher.class );
LocalDatabase localDatabase = mock( LocalDatabase.class ); LocalDatabase localDatabase = mock( LocalDatabase.class );


Expand All @@ -78,15 +74,61 @@ public void startShouldReplaceLocalStoreWithStoreFromCoreServerAndStartPolling()
EdgeServerStartupProcess edgeServerStartupProcess = new EdgeServerStartupProcess( storeFetcher, localDatabase, EdgeServerStartupProcess edgeServerStartupProcess = new EdgeServerStartupProcess( storeFetcher, localDatabase,
txPulling, dataSourceManager, new AlwaysChooseFirstServer( hazelcastTopology ), txPulling, dataSourceManager, new AlwaysChooseFirstServer( hazelcastTopology ),
new ConstantTimeRetryStrategy( 1, MILLISECONDS ), NullLogProvider.getInstance(), new ConstantTimeRetryStrategy( 1, MILLISECONDS ), NullLogProvider.getInstance(),
mock( EdgeTopologyService.class ), config ); mock( EdgeTopologyService.class ), new Config( Collections.emptyMap() ) );


// when // when
edgeServerStartupProcess.start(); edgeServerStartupProcess.start();


// then // then
verify( localDatabase ).copyStoreFrom( coreMember, storeFetcher );
verify( dataSourceManager ).start(); verify( dataSourceManager ).start();
verify( localDatabase ).isEmpty();
verify( localDatabase ).stop();
verify( localDatabase ).copyStoreFrom( coreMember, storeFetcher );
verify( localDatabase ).start();
verify( txPulling ).start(); verify( txPulling ).start();
verifyNoMoreInteractions( localDatabase, dataSourceManager, txPulling );
}

@Test
public void startShouldNotReplaceTheNonEmptyLocalStoreWithStoreFromCoreServerAndStartPolling() throws Throwable
{
// given
StoreFetcher storeFetcher = mock( StoreFetcher.class );
LocalDatabase localDatabase = mock( LocalDatabase.class );

CoreMember coreMember = new CoreMember( UUID.randomUUID() );
TopologyService hazelcastTopology = mock( TopologyService.class );

ClusterTopology clusterTopology = mock( ClusterTopology.class );
when( hazelcastTopology.currentTopology() ).thenReturn( clusterTopology );

when( clusterTopology.coreMembers() ).thenReturn( asSet( coreMember ) );
when( localDatabase.isEmpty() ).thenReturn( false );

DataSourceManager dataSourceManager = mock( DataSourceManager.class );
Lifecycle txPulling = mock( Lifecycle.class );

EdgeServerStartupProcess edgeServerStartupProcess = new EdgeServerStartupProcess( storeFetcher, localDatabase,
txPulling, dataSourceManager, new AlwaysChooseFirstServer( hazelcastTopology ),
new ConstantTimeRetryStrategy( 1, MILLISECONDS ), NullLogProvider.getInstance(),
mock( EdgeTopologyService.class ), new Config( Collections.emptyMap() ) );

// when
try
{
edgeServerStartupProcess.start();
fail( "shoud have thrown" );
}
catch ( IllegalStateException ex )
{
// expected
}

// then
verify( localDatabase ).isEmpty();
verify( dataSourceManager ).start();
verifyNoMoreInteractions( localDatabase, dataSourceManager );
verifyZeroInteractions( txPulling );
} }


@Test @Test
Expand Down Expand Up @@ -117,5 +159,6 @@ txPulling, dataSourceManager, new AlwaysChooseFirstServer( hazelcastTopology ),
// then // then
verify( txPulling ).stop(); verify( txPulling ).stop();
verify( dataSourceManager ).stop(); verify( dataSourceManager ).stop();
verifyNoMoreInteractions( txPulling, dataSourceManager );
} }
} }
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.coreedge.scenarios; package org.neo4j.coreedge.scenarios;


import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;


Expand Down Expand Up @@ -50,15 +49,17 @@
import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.format.highlimit.HighLimit; import org.neo4j.kernel.impl.store.format.highlimit.HighLimit;
import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0;
import org.neo4j.kernel.lifecycle.LifecycleException;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.test.coreedge.ClusterRule; import org.neo4j.test.coreedge.ClusterRule;


import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
Expand All @@ -74,9 +75,8 @@
public class EdgeServerReplicationIT public class EdgeServerReplicationIT
{ {
@Rule @Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() ) public final ClusterRule clusterRule =
.withNumberOfCoreServers( 3 ) new ClusterRule( getClass() ).withNumberOfCoreServers( 3 ).withNumberOfEdgeServers( 1 );
.withNumberOfEdgeServers( 1 );


@Test @Test
public void shouldNotBeAbleToWriteToEdge() throws Exception public void shouldNotBeAbleToWriteToEdge() throws Exception
Expand Down Expand Up @@ -145,7 +145,7 @@ public void shouldEventuallyPullTransactionDownToAllEdgeServers() throws Excepti
// then // then
for ( final EdgeServer server : cluster.edgeServers() ) for ( final EdgeServer server : cluster.edgeServers() )
{ {
GraphDatabaseService edgeDB = server.database(); GraphDatabaseService edgeDB = server.database();
try ( Transaction tx = edgeDB.beginTx() ) try ( Transaction tx = edgeDB.beginTx() )
{ {
ThrowingSupplier<Long,Exception> nodeCount = () -> count( edgeDB.getAllNodes() ); ThrowingSupplier<Long,Exception> nodeCount = () -> count( edgeDB.getAllNodes() );
Expand All @@ -163,7 +163,6 @@ public void shouldEventuallyPullTransactionDownToAllEdgeServers() throws Excepti
} }


@Test @Test
@Ignore("WIP: Turn this back on once Max/Davide have fixed the Edge Server StoreId stuff")
public void shouldShutdownRatherThanPullUpdatesFromCoreServerWithDifferentStoreIfServerHasData() throws Exception public void shouldShutdownRatherThanPullUpdatesFromCoreServerWithDifferentStoreIfServerHasData() throws Exception
{ {
Cluster cluster = clusterRule.withNumberOfEdgeServers( 0 ).startCluster(); Cluster cluster = clusterRule.withNumberOfEdgeServers( 0 ).startCluster();
Expand All @@ -176,16 +175,21 @@ public void shouldShutdownRatherThanPullUpdatesFromCoreServerWithDifferentStoreI
} }
}, cluster ); }, cluster );


EdgeServer edgeServer = cluster.addEdgeServerWithId( 4 ); EdgeServer edgeServer = cluster.addEdgeServerWithId( 4 );
putSomeDataWithDifferentStoreId(edgeServer.storeDir(), cluster.getCoreServerById( 0 ).storeDir()); putSomeDataWithDifferentStoreId( edgeServer.storeDir(), cluster.getCoreServerById( 0 ).storeDir() );

try try
{ {
edgeServer.start(); edgeServer.start();
fail("Should have failed to start"); fail( "Should have failed to start" );
} }
catch ( RuntimeException required ) catch ( LifecycleException required )
{ {
// Lifecycle should throw exception, server should not start. // Lifecycle should throw exception, server should not start.
assertThat( required.getCause(), instanceOf( LifecycleException.class ) );
assertThat( required.getCause().getCause(), instanceOf( IllegalStateException.class ) );
assertThat( required.getCause().getCause().getMessage(),
equalTo( "Local database is not empty cannot copy store" ) );
} }
} }


Expand Down Expand Up @@ -225,26 +229,21 @@ public void shouldThrowExceptionIfEdgeRecordFormatDiffersToCoreRecordFormat() th
} }
catch ( Exception e ) catch ( Exception e )
{ {
assertThat(e.getCause().getCause().getMessage(), assertThat( e.getCause().getCause().getMessage(),
containsString("Failed to start database with copied store")); containsString( "Failed to start database with copied store" ) );
} }
} }


@Test @Test
public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception
{ {
// given // given
Map<String,String> params = stringMap( Map<String,String> params = stringMap( CoreEdgeClusterSettings.raft_log_rotation_size.name(), "1k",
CoreEdgeClusterSettings.raft_log_rotation_size.name(), "1k",
CoreEdgeClusterSettings.raft_log_pruning_frequency.name(), "500ms", CoreEdgeClusterSettings.raft_log_pruning_frequency.name(), "500ms",
CoreEdgeClusterSettings.state_machine_flush_window_size.name(), "1", CoreEdgeClusterSettings.state_machine_flush_window_size.name(), "1",
CoreEdgeClusterSettings.raft_log_pruning_strategy.name(), "1 entries" CoreEdgeClusterSettings.raft_log_pruning_strategy.name(), "1 entries" );
); Cluster cluster = clusterRule.withNumberOfEdgeServers( 0 ).withSharedCoreParams( params )
Cluster cluster = clusterRule .withRecordFormat( HighLimit.NAME ).startCluster();
.withNumberOfEdgeServers( 0 )
.withSharedCoreParams( params )
.withRecordFormat( HighLimit.NAME )
.startCluster();


cluster.coreTx( ( db, tx ) -> { cluster.coreTx( ( db, tx ) -> {
Node node = db.createNode( Label.label( "L" ) ); Node node = db.createNode( Label.label( "L" ) );
Expand All @@ -271,15 +270,17 @@ public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception
} }


File storeDir = coreGraphDatabase.storeDir(); File storeDir = coreGraphDatabase.storeDir();
assertEventually( "pruning happened", () -> versionBy( storeDir, Math::min ), greaterThan( baseVersion ), 1, SECONDS ); assertEventually( "pruning happened", () -> versionBy( storeDir, Math::min ), greaterThan( baseVersion ), 1,
SECONDS );


// when // when
cluster.addEdgeServerWithIdAndRecordFormat( 42, HighLimit.NAME ).start(); cluster.addEdgeServerWithIdAndRecordFormat( 42, HighLimit.NAME ).start();


// then // then
for ( final EdgeServer edge : cluster.edgeServers() ) for ( final EdgeServer edge : cluster.edgeServers() )
{ {
assertEventually( "edge server available", () -> edge.database().isAvailable( 0 ), is( true ), 10, SECONDS ); assertEventually( "edge server available", () -> edge.database().isAvailable( 0 ), is( true ), 10,
SECONDS );
} }
} }


Expand Down

0 comments on commit d8ab022

Please sign in to comment.