Skip to content

Commit

Permalink
Rewrote ClusterShutdownIt
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW committed Apr 30, 2018
1 parent 6366337 commit 3a1950b
Showing 1 changed file with 73 additions and 55 deletions.
Expand Up @@ -19,135 +19,153 @@
*/
package org.neo4j.causalclustering.scenarios;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.security.WriteOperationsNotAllowedException;
import org.neo4j.test.causalclustering.ClusterRule;

import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith( Parameterized.class )
public class ClusterShutdownIT
{
@Rule
public final ClusterRule clusterRule =
new ClusterRule().withNumberOfCoreMembers( 3 ).withNumberOfReadReplicas( 0 );
public final ClusterRule clusterRule = new ClusterRule().withNumberOfCoreMembers( 3 ).withNumberOfReadReplicas( 0 );

@Parameterized.Parameter()
public Collection<Integer> shutdownOrder;
private Cluster cluster;

@Parameterized.Parameters( name = "shutdown order {0}" )
public static Collection<Collection<Integer>> shutdownOrders()
{
return asList( asList( 0, 1, 2 ), asList( 1, 2, 0 ), asList( 2, 0, 1 ) );
}

@Test
public void shouldShutdownEvenThoughWaitingForLock() throws Exception
@Before
public void startCluster() throws Exception
{
Cluster cluster = clusterRule.startCluster();
cluster = clusterRule.startCluster();
}

try
@After
public void shutdownCluster()
{
if ( cluster != null )
{
for ( int victimId = 0; victimId < cluster.numberOfCoreMembersReportedByTopology(); victimId++ )
{
assertTrue( cluster.getCoreMemberById( victimId ).database().isAvailable( 1000 ) );
shouldShutdownEvenThoughWaitingForLock0( cluster, victimId, shutdownOrder );
cluster.start();
}
cluster.shutdown();
}
catch ( WriteOperationsNotAllowedException e )
}

@Test
public void shouldShutdownEvenThoughWaitingForLock() throws Exception
{
CoreClusterMember leader = cluster.awaitLeader();
shouldShutdownEvenThoughWaitingForLock0( cluster, leader.serverId(), shutdownOrder );
}

private void createANode( AtomicReference<Node> node ) throws Exception
{
cluster.coreTx( ( coreGraphDatabase, transaction ) ->
{
// expected
}
node.set( coreGraphDatabase.createNode() );
transaction.success();
} );
}

private void shouldShutdownEvenThoughWaitingForLock0( Cluster cluster, int victimId, Collection<Integer> shutdownOrder )
throws Exception
private void shouldShutdownEvenThoughWaitingForLock0( Cluster cluster, int victimId, Collection<Integer> shutdownOrder ) throws Exception
{
final int LONG_TIME = 60_000;
final int LONGER_TIME = 2 * LONG_TIME;
final int NUMBER_OF_LOCK_ACQUIRERS = 2;

final ExecutorService txExecutor = Executors.newCachedThreadPool(); // Blocking transactions are executed in
// parallel, not on the main thread.
final ExecutorService shutdownExecutor = Executors.newFixedThreadPool( 1 ); // Shutdowns are executed
// serially, not on the main thread.

final CountDownLatch acquiredLocksCountdown = new CountDownLatch( NUMBER_OF_LOCK_ACQUIRERS );
final CountDownLatch locksHolder = new CountDownLatch( 1 );
final AtomicReference<Node> node = new AtomicReference<>();
final AtomicReference<Exception> txFailure = new AtomicReference<>();

CompletableFuture<Void> preShutdown = new CompletableFuture<>();

// set shutdown order
CompletableFuture<Void> afterShutdown = preShutdown;
for ( Integer id : shutdownOrder )
{
afterShutdown = afterShutdown.thenRunAsync( () -> cluster.getCoreMemberById( id ).shutdown(), shutdownExecutor );
}

createANode( node );

try
{
// when - blocking on lock acquiring
final AtomicReference<Node> someNode = new AtomicReference<>();
final GraphDatabaseService victimDB = cluster.getCoreMemberById( victimId ).database();
final GraphDatabaseService leader = cluster.getCoreMemberById( victimId ).database();

try ( Transaction tx = victimDB.beginTx() )
{
someNode.set( victimDB.createNode() );
tx.success();
}

final AtomicInteger numberOfInstancesReady = new AtomicInteger();
for ( int i = 0; i < NUMBER_OF_LOCK_ACQUIRERS; i++ )
{
txExecutor.execute( () ->
{
try ( Transaction tx = victimDB.beginTx() )
try ( Transaction tx = leader.beginTx() )
{
numberOfInstancesReady.incrementAndGet();

tx.acquireWriteLock( someNode.get() );
Thread.sleep( LONGER_TIME );

tx.acquireWriteLock( node.get() );
acquiredLocksCountdown.countDown();
locksHolder.await();
tx.success();
}
catch ( Exception e )
{
/* Since we are shutting down, a plethora of possible exceptions are expected. */
txFailure.accumulateAndGet( e, ( e1, e2 ) ->
{
if ( e1 != null && e2 != null && !e1.equals( e2 ) )
{
e1.addSuppressed( e2 );
return e1;
}
return e2;
} );
}
} );
}

while ( numberOfInstancesReady.get() < NUMBER_OF_LOCK_ACQUIRERS )
{
Thread.sleep( 100 );
}

final CountDownLatch shutdownLatch = new CountDownLatch( cluster.numberOfCoreMembersReportedByTopology() );
// await locks
acquiredLocksCountdown.await( LONG_TIME, MILLISECONDS );

// then - shutdown in any order should still be possible
for ( final int id : shutdownOrder )
// check for premature failures
Thread.sleep( 100 );
Exception prematureFailure = txFailure.get();
if ( prematureFailure != null )
{
shutdownExecutor.execute( () ->
{
cluster.getCoreMemberById( id ).shutdown();
shutdownLatch.countDown();
} );
throw new RuntimeException( "Failed prematurely", prematureFailure );
}

if ( !shutdownLatch.await( LONG_TIME, MILLISECONDS ) )
{
fail( "Cluster didn't shut down in a timely fashion." );
}
// then shutdown in given order works
preShutdown.complete( null );
afterShutdown.get( LONG_TIME, MILLISECONDS );
}
finally
{
afterShutdown.cancel( true );
locksHolder.countDown();
txExecutor.shutdownNow();
shutdownExecutor.shutdownNow();
}
Expand Down

0 comments on commit 3a1950b

Please sign in to comment.