Skip to content

Commit

Permalink
Simplify test code, make it more robust, and improve error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Aug 25, 2016
1 parent f38418f commit 42c990c
Showing 1 changed file with 64 additions and 87 deletions.
151 changes: 64 additions & 87 deletions enterprise/ha/src/test/java/org/neo4j/ha/UpdatePullerSwitchIT.java
Expand Up @@ -19,12 +19,13 @@
*/
package org.neo4j.ha;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.function.Function;

import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.InstanceId;
Expand All @@ -34,142 +35,118 @@
import org.neo4j.graphdb.Transaction;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.HaSettings;
import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase;
import org.neo4j.kernel.ha.SlaveUpdatePuller;
import org.neo4j.kernel.ha.UpdatePuller;
import org.neo4j.kernel.impl.ha.ClusterManager;
import org.neo4j.test.ha.ClusterRule;

import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.neo4j.kernel.ha.HaSettings.tx_push_factor;
import static org.neo4j.kernel.ha.SlaveUpdatePuller.UPDATE_PULLER_THREAD_PREFIX;
import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable;
import static org.neo4j.kernel.impl.ha.ClusterManager.clusterOfSize;
import static org.neo4j.kernel.impl.ha.ClusterManager.masterAvailable;

public class UpdatePullerSwitchIT
{
@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() );
private ClusterManager.ManagedCluster managedCluster;

@Before
public void setup() throws Exception
{
managedCluster = clusterRule.withCluster( ClusterManager.clusterOfSize( 2 ) )
.withSharedSetting( tx_push_factor, "0" )
.withSharedSetting( HaSettings.pull_interval, "100s" )
.withFirstInstanceId( 6 )
.startCluster();
}
public final ClusterRule clusterRule = new ClusterRule( getClass() ).withCluster( clusterOfSize( 2 ) )
.withSharedSetting( tx_push_factor, "0" );

@Test
public void updatePullerSwitchOnNodeModeSwitch() throws Throwable
{
String masterLabel = "masterLabel";
createLabeledNodeOnMaster( masterLabel );
ClusterManager.ManagedCluster cluster = clusterRule.startCluster();

Label firstLabel = Label.label( "firstLabel" );
createLabeledNodeOnMaster( cluster, firstLabel );
// force update puller to work
pullUpdatesOnSlave();
pullUpdatesOnSlave( cluster );
// node should exist on slave now
checkLabeledNodeExistanceOnSlave( masterLabel );
checkLabeledNodeExistanceOnSlave( cluster, firstLabel );
// verify that puller working on slave and not working on master
verifyUpdatePullerThreads();

// switch roles in cluster - now update puller should be stopped on old slave and start on old master.
ClusterManager.RepairKit initialMasterRepairKit = managedCluster.shutdown( managedCluster.getMaster() );
managedCluster.await( ClusterManager.masterAvailable() );
verifyUpdatePullerThreads( cluster );

String pretenderMasterLabel = "pretenderMasterLabel";
createLabeledNodeOnMaster( pretenderMasterLabel );

initialMasterRepairKit.repair();
managedCluster.await( ClusterManager.masterSeesSlavesAsAvailable( 1 ) );
for ( int i = 1; i <= 2; i++ )
{
// switch roles in cluster - now update puller should be stopped on old slave and start on old master.
ClusterManager.RepairKit repairKit = cluster.shutdown( cluster.getMaster() );
cluster.await( masterAvailable() );

// forcing updates pulling
pullUpdatesOnSlave();
checkLabeledNodeExistanceOnSlave( pretenderMasterLabel );
// checking pulling threads
verifyUpdatePullerThreads();
Label currentLabel = Label.label( "label_" + i );

createLabeledNodeOnMaster( cluster, currentLabel );

// and finally switching roles back
ClusterManager.RepairKit justiceRepairKit = managedCluster.shutdown( managedCluster.getMaster() );
managedCluster.await( ClusterManager.masterAvailable() );
repairKit.repair();
cluster.await( allSeesAllAsAvailable(), 120 );

String justicePrevailedLabel = "justice prevailed";
createLabeledNodeOnMaster( justicePrevailedLabel );
// forcing updates pulling
pullUpdatesOnSlave( cluster );
checkLabeledNodeExistanceOnSlave( cluster, currentLabel );
// checking pulling threads
verifyUpdatePullerThreads( cluster );
}
}

justiceRepairKit.repair();
managedCluster.await( ClusterManager.masterSeesSlavesAsAvailable( 1 ) );
private void verifyUpdatePullerThreads( ClusterManager.ManagedCluster cluster )
{
Map<Thread,StackTraceElement[]> threads = Thread.getAllStackTraces();
Optional<Map.Entry<Thread,StackTraceElement[]>> masterEntry =
findThreadWithPrefix( threads, UPDATE_PULLER_THREAD_PREFIX + serverId( cluster.getMaster() ) );
assertFalse( format( "Found an update puller on master.%s", masterEntry.map( this::prettyPrint ).orElse( "" ) ),
masterEntry.isPresent() );

Optional<Map.Entry<Thread,StackTraceElement[]>> slaveEntry =
findThreadWithPrefix( threads, UPDATE_PULLER_THREAD_PREFIX + serverId( cluster.getAnySlave() ) );
assertTrue( "Found no update puller on slave", slaveEntry.isPresent() );
}

// forcing pull updates
pullUpdatesOnSlave();
checkLabeledNodeExistanceOnSlave( justicePrevailedLabel );
// checking pulling threads
verifyUpdatePullerThreads();
private String prettyPrint( Map.Entry<Thread,StackTraceElement[]> entry )
{
return format( "\n\tThread: %s\n\tStackTrace: %s", entry.getKey(), Arrays.toString( entry.getValue() ) );
}

private void verifyUpdatePullerThreads()
private InstanceId serverId( HighlyAvailableGraphDatabase db )
{
InstanceId masterId = managedCluster.getMaster().getDependencyResolver().resolveDependency( Config.class ).get( ClusterSettings.server_id );
InstanceId slaveId = managedCluster.getAnySlave().getDependencyResolver().resolveDependency( Config.class ).get( ClusterSettings.server_id );
Map<Thread,StackTraceElement[]> allStackTraces = Thread.getAllStackTraces();
Set<Thread> threads = allStackTraces.keySet();
assertNull( "Master should not have any puller threads", findThreadWithPrefix( threads,
SlaveUpdatePuller.UPDATE_PULLER_THREAD_PREFIX + masterId ) );
assertNotNull( "Slave should have active puller thread", findThreadWithPrefix( threads,
SlaveUpdatePuller.UPDATE_PULLER_THREAD_PREFIX + slaveId ) );
return db.getDependencyResolver().resolveDependency( Config.class ).get( ClusterSettings.server_id );
}

/*
* Returns the name, as a String, of first thread found that has a name starting with the provided prefix,
* null otherwise.
*/
private String findThreadWithPrefix( Set<Thread> threads, String prefix )
private Optional<Map.Entry<Thread,StackTraceElement[]>> findThreadWithPrefix(
Map<Thread,StackTraceElement[]> threads, String prefix )
{
for ( Thread thread : threads )
{
if ( thread.getName().startsWith( prefix ) )
{
return thread.getName();
}
}
return null;
return threads.entrySet().stream()
.filter( entry -> entry.getKey().getName().startsWith( prefix ) ).findFirst();
}

private void pullUpdatesOnSlave() throws InterruptedException
private void pullUpdatesOnSlave( ClusterManager.ManagedCluster cluster ) throws InterruptedException
{
UpdatePuller updatePuller =
managedCluster.getAnySlave().getDependencyResolver().resolveDependency( UpdatePuller.class );
cluster.getAnySlave().getDependencyResolver().resolveDependency( UpdatePuller.class );
assertTrue( "We should always have some updates to pull", updatePuller.tryPullUpdates() );
}

private void checkLabeledNodeExistanceOnSlave( String label )
private void checkLabeledNodeExistanceOnSlave( ClusterManager.ManagedCluster cluster, Label label )
{
// since we have only 2 nodes in cluster its safe to call get any cluster
HighlyAvailableGraphDatabase slave = managedCluster.getAnySlave();
HighlyAvailableGraphDatabase slave = cluster.getAnySlave();
try ( Transaction transaction = slave.beginTx() )
{
checkNodeWithLabelExists( slave, label );
ResourceIterator<Node> slaveNodes = slave.findNodes( label );
assertEquals( 1, Iterators.asList( slaveNodes ).size() );
transaction.success();
}

}

private void createLabeledNodeOnMaster( String label )
private void createLabeledNodeOnMaster( ClusterManager.ManagedCluster cluster, Label label )
{
HighlyAvailableGraphDatabase master = managedCluster.getMaster();
HighlyAvailableGraphDatabase master = cluster.getMaster();
try ( Transaction transaction = master.beginTx() )
{
Node masterNode = master.createNode();
masterNode.addLabel( Label.label( label ) );
masterNode.addLabel( label );
transaction.success();
}
}

private void checkNodeWithLabelExists( HighlyAvailableGraphDatabase database, String label )
{
ResourceIterator<Node> slaveNodes = database.findNodes( Label.label( label ) );
assertEquals( 1, Iterators.asList( slaveNodes ).size() );
}
}

0 comments on commit 42c990c

Please sign in to comment.