diff --git a/enterprise/ha/src/test/java/org/neo4j/ha/UpdatePullerSwitchIT.java b/enterprise/ha/src/test/java/org/neo4j/ha/UpdatePullerSwitchIT.java index 83a6be27f44c3..c2c8fd7da7004 100644 --- a/enterprise/ha/src/test/java/org/neo4j/ha/UpdatePullerSwitchIT.java +++ b/enterprise/ha/src/test/java/org/neo4j/ha/UpdatePullerSwitchIT.java @@ -19,12 +19,13 @@ */ package org.neo4j.ha; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import java.util.Arrays; import java.util.Map; -import java.util.Set; +import java.util.Optional; +import java.util.function.Function; import org.neo4j.cluster.ClusterSettings; import org.neo4j.cluster.InstanceId; @@ -34,142 +35,118 @@ import org.neo4j.graphdb.Transaction; import org.neo4j.helpers.collection.Iterators; import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.ha.HaSettings; import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase; -import org.neo4j.kernel.ha.SlaveUpdatePuller; import org.neo4j.kernel.ha.UpdatePuller; import org.neo4j.kernel.impl.ha.ClusterManager; import org.neo4j.test.ha.ClusterRule; +import static java.lang.String.format; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.neo4j.kernel.ha.HaSettings.tx_push_factor; +import static org.neo4j.kernel.ha.SlaveUpdatePuller.UPDATE_PULLER_THREAD_PREFIX; +import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable; +import static org.neo4j.kernel.impl.ha.ClusterManager.clusterOfSize; +import static org.neo4j.kernel.impl.ha.ClusterManager.masterAvailable; public class UpdatePullerSwitchIT { @Rule - public final ClusterRule clusterRule = new ClusterRule( getClass() ); - private ClusterManager.ManagedCluster managedCluster; - - @Before - public void setup() throws Exception - { - managedCluster = clusterRule.withCluster( ClusterManager.clusterOfSize( 2 ) ) - .withSharedSetting( tx_push_factor, "0" ) - .withSharedSetting( HaSettings.pull_interval, "100s" ) - .withFirstInstanceId( 6 ) - .startCluster(); - } + public final ClusterRule clusterRule = new ClusterRule( getClass() ).withCluster( clusterOfSize( 2 ) ) + .withSharedSetting( tx_push_factor, "0" ); @Test public void updatePullerSwitchOnNodeModeSwitch() throws Throwable { - String masterLabel = "masterLabel"; - createLabeledNodeOnMaster( masterLabel ); + ClusterManager.ManagedCluster cluster = clusterRule.startCluster(); + + Label firstLabel = Label.label( "firstLabel" ); + createLabeledNodeOnMaster( cluster, firstLabel ); // force update puller to work - pullUpdatesOnSlave(); + pullUpdatesOnSlave( cluster ); // node should exist on slave now - checkLabeledNodeExistanceOnSlave( masterLabel ); + checkLabeledNodeExistanceOnSlave( cluster, firstLabel ); // verify that puller working on slave and not working on master - verifyUpdatePullerThreads(); - - // switch roles in cluster - now update puller should be stopped on old slave and start on old master. - ClusterManager.RepairKit initialMasterRepairKit = managedCluster.shutdown( managedCluster.getMaster() ); - managedCluster.await( ClusterManager.masterAvailable() ); + verifyUpdatePullerThreads( cluster ); - String pretenderMasterLabel = "pretenderMasterLabel"; - createLabeledNodeOnMaster( pretenderMasterLabel ); - - initialMasterRepairKit.repair(); - managedCluster.await( ClusterManager.masterSeesSlavesAsAvailable( 1 ) ); + for ( int i = 1; i <= 2; i++ ) + { + // switch roles in cluster - now update puller should be stopped on old slave and start on old master. + ClusterManager.RepairKit repairKit = cluster.shutdown( cluster.getMaster() ); + cluster.await( masterAvailable() ); - // forcing updates pulling - pullUpdatesOnSlave(); - checkLabeledNodeExistanceOnSlave( pretenderMasterLabel ); - // checking pulling threads - verifyUpdatePullerThreads(); + Label currentLabel = Label.label( "label_" + i ); + createLabeledNodeOnMaster( cluster, currentLabel ); - // and finally switching roles back - ClusterManager.RepairKit justiceRepairKit = managedCluster.shutdown( managedCluster.getMaster() ); - managedCluster.await( ClusterManager.masterAvailable() ); + repairKit.repair(); + cluster.await( allSeesAllAsAvailable(), 120 ); - String justicePrevailedLabel = "justice prevailed"; - createLabeledNodeOnMaster( justicePrevailedLabel ); + // forcing updates pulling + pullUpdatesOnSlave( cluster ); + checkLabeledNodeExistanceOnSlave( cluster, currentLabel ); + // checking pulling threads + verifyUpdatePullerThreads( cluster ); + } + } - justiceRepairKit.repair(); - managedCluster.await( ClusterManager.masterSeesSlavesAsAvailable( 1 ) ); + private void verifyUpdatePullerThreads( ClusterManager.ManagedCluster cluster ) + { + Map threads = Thread.getAllStackTraces(); + Optional> masterEntry = + findThreadWithPrefix( threads, UPDATE_PULLER_THREAD_PREFIX + serverId( cluster.getMaster() ) ); + assertFalse( format( "Found an update puller on master.%s", masterEntry.map( this::prettyPrint ).orElse( "" ) ), + masterEntry.isPresent() ); + + Optional> slaveEntry = + findThreadWithPrefix( threads, UPDATE_PULLER_THREAD_PREFIX + serverId( cluster.getAnySlave() ) ); + assertTrue( "Found no update puller on slave", slaveEntry.isPresent() ); + } - // forcing pull updates - pullUpdatesOnSlave(); - checkLabeledNodeExistanceOnSlave( justicePrevailedLabel ); - // checking pulling threads - verifyUpdatePullerThreads(); + private String prettyPrint( Map.Entry entry ) + { + return format( "\n\tThread: %s\n\tStackTrace: %s", entry.getKey(), Arrays.toString( entry.getValue() ) ); } - private void verifyUpdatePullerThreads() + private InstanceId serverId( HighlyAvailableGraphDatabase db ) { - InstanceId masterId = managedCluster.getMaster().getDependencyResolver().resolveDependency( Config.class ).get( ClusterSettings.server_id ); - InstanceId slaveId = managedCluster.getAnySlave().getDependencyResolver().resolveDependency( Config.class ).get( ClusterSettings.server_id ); - Map allStackTraces = Thread.getAllStackTraces(); - Set threads = allStackTraces.keySet(); - assertNull( "Master should not have any puller threads", findThreadWithPrefix( threads, - SlaveUpdatePuller.UPDATE_PULLER_THREAD_PREFIX + masterId ) ); - assertNotNull( "Slave should have active puller thread", findThreadWithPrefix( threads, - SlaveUpdatePuller.UPDATE_PULLER_THREAD_PREFIX + slaveId ) ); + return db.getDependencyResolver().resolveDependency( Config.class ).get( ClusterSettings.server_id ); } - /* - * Returns the name, as a String, of first thread found that has a name starting with the provided prefix, - * null otherwise. - */ - private String findThreadWithPrefix( Set threads, String prefix ) + private Optional> findThreadWithPrefix( + Map threads, String prefix ) { - for ( Thread thread : threads ) - { - if ( thread.getName().startsWith( prefix ) ) - { - return thread.getName(); - } - } - return null; + return threads.entrySet().stream() + .filter( entry -> entry.getKey().getName().startsWith( prefix ) ).findFirst(); } - private void pullUpdatesOnSlave() throws InterruptedException + private void pullUpdatesOnSlave( ClusterManager.ManagedCluster cluster ) throws InterruptedException { UpdatePuller updatePuller = - managedCluster.getAnySlave().getDependencyResolver().resolveDependency( UpdatePuller.class ); + cluster.getAnySlave().getDependencyResolver().resolveDependency( UpdatePuller.class ); assertTrue( "We should always have some updates to pull", updatePuller.tryPullUpdates() ); } - private void checkLabeledNodeExistanceOnSlave( String label ) + private void checkLabeledNodeExistanceOnSlave( ClusterManager.ManagedCluster cluster, Label label ) { - // since we have only 2 nodes in cluster its safe to call get any cluster - HighlyAvailableGraphDatabase slave = managedCluster.getAnySlave(); + HighlyAvailableGraphDatabase slave = cluster.getAnySlave(); try ( Transaction transaction = slave.beginTx() ) { - checkNodeWithLabelExists( slave, label ); + ResourceIterator slaveNodes = slave.findNodes( label ); + assertEquals( 1, Iterators.asList( slaveNodes ).size() ); + transaction.success(); } - } - private void createLabeledNodeOnMaster( String label ) + private void createLabeledNodeOnMaster( ClusterManager.ManagedCluster cluster, Label label ) { - HighlyAvailableGraphDatabase master = managedCluster.getMaster(); + HighlyAvailableGraphDatabase master = cluster.getMaster(); try ( Transaction transaction = master.beginTx() ) { Node masterNode = master.createNode(); - masterNode.addLabel( Label.label( label ) ); + masterNode.addLabel( label ); transaction.success(); } } - - private void checkNodeWithLabelExists( HighlyAvailableGraphDatabase database, String label ) - { - ResourceIterator slaveNodes = database.findNodes( Label.label( label ) ); - assertEquals( 1, Iterators.asList( slaveNodes ).size() ); - } }