Skip to content

Commit

Permalink
Remake cluster stress tests
Browse files Browse the repository at this point in the history
All the cluster stress tests now share the same common framework
and stress-testing scenarios can be set up by manipulating environment
variables. A few scenarios are run as ITs to pave the way for
the longer stress-runs.
  • Loading branch information
martinfurmanski committed Mar 23, 2018
1 parent c1b8752 commit 9190609
Show file tree
Hide file tree
Showing 36 changed files with 1,644 additions and 830 deletions.
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;

import org.neo4j.backup.impl.OnlineBackupCommandBuilder;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
Expand All @@ -34,6 +35,7 @@

import static org.junit.Assert.assertEquals;
import static org.neo4j.causalclustering.BackupCoreIT.backupArguments;
import static org.neo4j.io.NullOutputStream.NULL_OUTPUT_STREAM;
import static org.neo4j.util.TestHelpers.runBackupToolFromOtherJvmToGetExitCode;

public class BackupUtil
Expand All @@ -51,21 +53,27 @@ public static File createBackupFromCore( CoreClusterMember core, String backupNa
}

public static File createBackupInProcess( ClusterMember<?> member, File baseBackupDir, String backupName ) throws Exception
{
return createBackupInProcess( member, baseBackupDir, backupName, NULL_OUTPUT_STREAM );
}

public static File createBackupInProcess( ClusterMember<?> member, File baseBackupDir, String backupName, OutputStream outputStream ) throws Exception
{
AdvertisedSocketAddress address = member.config().get( CausalClusteringSettings.transaction_advertised_address );
File targetDir = new File( baseBackupDir, backupName );

new OnlineBackupCommandBuilder()
.withOutput( outputStream )
.withHost( address.getHostname() )
.withPort( address.getPort() )
.backup( targetDir );

return targetDir;
}

public static void restoreFromBackup( File backup, FileSystemAbstraction fsa, CoreClusterMember coreClusterMember ) throws IOException, CommandFailed
public static void restoreFromBackup( File backup, FileSystemAbstraction fsa, ClusterMember clusterMember ) throws IOException, CommandFailed
{
Config config = coreClusterMember.config();
Config config = clusterMember.config();
RestoreDatabaseCommand restoreDatabaseCommand = new RestoreDatabaseCommand( fsa, backup, config, "graph-db", true );
restoreDatabaseCommand.execute();
}
Expand Down
Expand Up @@ -32,13 +32,15 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.CoreGraphDatabase;
Expand All @@ -50,7 +52,6 @@
import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase;
import org.neo4j.function.ThrowingSupplier;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.graphdb.security.WriteOperationsNotAllowedException;
Expand Down Expand Up @@ -89,6 +90,8 @@ public class Cluster

private Map<Integer,CoreClusterMember> coreMembers = new ConcurrentHashMap<>();
private Map<Integer,ReadReplica> readReplicas = new ConcurrentHashMap<>();
private int highestCoreServerId;
private int highestReplicaServerId;

public Cluster( File parentDir, int noOfCoreMembers, int noOfReadReplicas,
DiscoveryServiceFactory discoveryServiceFactory,
Expand Down Expand Up @@ -158,6 +161,18 @@ public CoreClusterMember addCoreMemberWithId( int memberId )
return addCoreMemberWithId( memberId, coreParams, instanceCoreParams, recordFormat );
}

public CoreClusterMember newCoreMember()
{
int newCoreServerId = ++highestCoreServerId;
return addCoreMemberWithId( newCoreServerId );
}

public ReadReplica newReadReplica()
{
int newReplicaServerId = ++highestReplicaServerId;
return addReadReplicaWithId( newReplicaServerId );
}

private CoreClusterMember addCoreMemberWithId( int memberId, Map<String,String> extraParams,
Map<String,IntFunction<String>> instanceExtraParams, String recordFormat )
{
Expand Down Expand Up @@ -436,8 +451,6 @@ private CoreClusterMember leaderTx( String dbName, BiConsumer<CoreGraphDatabase,
{
if ( isTransientFailure( e ) )
{
// this is not the best, but it helps in debugging
e.printStackTrace();
return null;
}
else
Expand Down Expand Up @@ -505,6 +518,7 @@ private void createCoreMembers( final int noOfCoreMembers,
);
coreMembers.put( i, coreClusterMember );
}
highestCoreServerId = noOfCoreMembers - 1;
}

protected CoreClusterMember createCoreClusterMember( int serverId,
Expand Down Expand Up @@ -617,6 +631,7 @@ private void createReadReplicas( int noOfReadReplicas,

readReplicas.put( i, readReplica );
}
highestReplicaServerId = noOfReadReplicas - 1;
}

private void shutdownReadReplicas( ErrorHandler errorHandler )
Expand Down Expand Up @@ -703,4 +718,24 @@ public ClusterMember getMemberByBoltAddress( AdvertisedSocketAddress advertisedS

throw new RuntimeException( "Could not find a member for bolt address " + advertisedSocketAddress );
}

public Optional<ClusterMember> randomMember( boolean mustBeStarted )
{
Stream<ClusterMember> members = Stream.concat( coreMembers().stream(), readReplicas().stream() );

if ( mustBeStarted )
{
members = members.filter( m -> !m.isShutdown() );
}

List<ClusterMember> eligible = members.collect( Collectors.toList() );

if ( eligible.size() == 0 )
{
return Optional.empty();
}

int ordinal = ThreadLocalRandom.current().nextInt( eligible.size() );
return Optional.of( eligible.get( ordinal ) );
}
}
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.causalclustering.discovery;

import java.io.File;

import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.Monitors;
Expand All @@ -29,6 +31,8 @@ public interface ClusterMember<T extends GraphDatabaseAPI>

void shutdown();

boolean isShutdown();

T database();

ClientConnectorAddresses clientConnectorAddresses();
Expand All @@ -50,4 +54,6 @@ public interface ClusterMember<T extends GraphDatabaseAPI>
ThreadGroup threadGroup();

Monitors monitors();

File storeDir();
}
Expand Up @@ -180,17 +180,30 @@ public void shutdown()
{
if ( database != null )
{
database.shutdown();
database = null;
try
{
database.shutdown();
}
finally
{
database = null;
}
}
}

@Override
public boolean isShutdown()
{
return database == null;
}

@Override
public CoreGraphDatabase database()
{
return database;
}

@Override
public File storeDir()
{
return storeDir;
Expand Down
Expand Up @@ -135,11 +135,23 @@ public void shutdown()
{
if ( database != null )
{
database.shutdown();
database = null;
try
{
database.shutdown();
}
finally
{
database = null;
}
}
}

@Override
public boolean isShutdown()
{
return database == null;
}

public CatchupPollingProcess txPollingClient()
{
return database.getDependencyResolver().resolveDependency( CatchupPollingProcess.class );
Expand Down Expand Up @@ -175,6 +187,7 @@ public Monitors monitors()
return monitors;
}

@Override
public File storeDir()
{
return storeDir;
Expand Down
4 changes: 2 additions & 2 deletions stresstests/src/test/java/org/neo4j/backup/BackupHelper.java
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.function.Predicates;
import org.neo4j.helper.IsChannelClosedException;
import org.neo4j.helper.IsConnectionException;
import org.neo4j.helper.IsConnectionRestByPeer;
import org.neo4j.helper.IsConnectionResetByPeer;
import org.neo4j.helper.IsStoreClosed;
import org.neo4j.io.IOUtils;
import org.neo4j.kernel.configuration.Config;
Expand All @@ -42,7 +42,7 @@ public class BackupHelper

private static final Predicate<Throwable> isTransientError = Predicates.any(
new IsConnectionException(),
new IsConnectionRestByPeer(),
new IsConnectionResetByPeer(),
new IsChannelClosedException(),
new IsStoreClosed() );

Expand Down
Expand Up @@ -22,22 +22,22 @@
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;

import org.neo4j.backup.BackupHelper;
import org.neo4j.backup.BackupResult;
import org.neo4j.helper.RepeatUntilCallable;
import org.neo4j.causalclustering.stresstests.Control;
import org.neo4j.helper.Workload;

class BackupLoad extends RepeatUntilCallable
class BackupLoad extends Workload
{

private final String backupHostname;
private final int backupPort;
private final Path backupDir;

BackupLoad( BooleanSupplier keepGoing, Runnable onFailure, String backupHostname, int backupPort, Path backupDir )
BackupLoad( Control control, String backupHostname, int backupPort, Path backupDir )
{
super( keepGoing, onFailure );
super( control );
this.backupHostname = backupHostname;
this.backupPort = backupPort;
this.backupDir = backupDir;
Expand Down
Expand Up @@ -29,10 +29,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;

import org.neo4j.causalclustering.stresstests.Config;
import org.neo4j.causalclustering.stresstests.Control;
import org.neo4j.concurrent.Futures;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.factory.GraphDatabaseBuilder;
Expand All @@ -48,7 +48,6 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.fail;
import static org.neo4j.function.Suppliers.untilTimeExpired;
import static org.neo4j.helper.DatabaseConfiguration.configureBackup;
import static org.neo4j.helper.DatabaseConfiguration.configureTxLogRotationAndPruning;
import static org.neo4j.helper.StressTestingHelper.ensureExistsAndEmpty;
Expand Down Expand Up @@ -91,10 +90,7 @@ public void shouldBehaveCorrectlyUnderStress() throws Exception
new GraphDatabaseFactory().newEmbeddedDatabaseBuilder( storeDirectory.getAbsoluteFile() )
.setConfig( config );

final AtomicBoolean stopTheWorld = new AtomicBoolean();
BooleanSupplier notExpired = untilTimeExpired( durationInMinutes, MINUTES );
Runnable onFailure = () -> stopTheWorld.set( true );
BooleanSupplier keepGoingSupplier = () -> !stopTheWorld.get() && notExpired.getAsBoolean();
Control control = new Control( new Config() );

AtomicReference<GraphDatabaseService> dbRef = new AtomicReference<>();
ExecutorService service = Executors.newFixedThreadPool( 3 );
Expand All @@ -103,13 +99,11 @@ public void shouldBehaveCorrectlyUnderStress() throws Exception
dbRef.set( graphDatabaseBuilder.newGraphDatabase() );
if ( enableIndexes )
{
WorkLoad.setupIndexes( dbRef.get() );
TransactionalWorkload.setupIndexes( dbRef.get() );
}
Future<?> workload = service.submit( new WorkLoad( keepGoingSupplier, onFailure, dbRef::get ) );
Future<?> backupWorker = service.submit(
new BackupLoad( keepGoingSupplier, onFailure, backupHostname, backupPort, workDirectory ) );
Future<?> startStopWorker = service.submit(
new StartStop( keepGoingSupplier, onFailure, graphDatabaseBuilder::newGraphDatabase, dbRef ) );
Future<?> workload = service.submit( new TransactionalWorkload( control, dbRef::get ) );
Future<?> backupWorker = service.submit( new BackupLoad( control, backupHostname, backupPort, workDirectory ) );
Future<?> startStopWorker = service.submit( new StartStop( control, graphDatabaseBuilder::newGraphDatabase, dbRef ) );

Futures.combine( workload, backupWorker, startStopWorker ).get(durationInMinutes + 5, MINUTES );

Expand Down
Expand Up @@ -19,27 +19,25 @@
*/
package org.neo4j.backup.stresstests;


import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;

import org.neo4j.causalclustering.stresstests.Control;
import org.neo4j.function.Factory;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.helper.RepeatUntilCallable;
import org.neo4j.helper.Workload;

import static org.junit.Assert.assertTrue;

class StartStop extends RepeatUntilCallable
class StartStop extends Workload
{
private final AtomicReference<GraphDatabaseService> dbRef;
private final Factory<GraphDatabaseService> factory;

StartStop( BooleanSupplier keepGoing, Runnable onFailure, Factory<GraphDatabaseService> factory,
AtomicReference<GraphDatabaseService> dbRef )
StartStop( Control control, Factory<GraphDatabaseService> factory, AtomicReference<GraphDatabaseService> dbRef )
{
super( keepGoing, onFailure );
super( control );
this.factory = factory;
this.dbRef = dbRef;
}
Expand Down

0 comments on commit 9190609

Please sign in to comment.