From 822061d94b98a3a48de2dca46267c38d4e232ecf Mon Sep 17 00:00:00 2001 From: Davide Grohmann Date: Tue, 13 Sep 2016 13:18:08 +0200 Subject: [PATCH] Add core-edge stress test for backup/store copy interaction --- .../java/org/neo4j/function/Suppliers.java | 10 + .../TransactionAppenderStressTest.java | 9 +- .../org/neo4j/backup/BackupServiceIT.java | 2 +- .../BackupServiceStressTestingBuilder.java | 6 - .../coreedge/discovery/ClusterMember.java | 27 ++ .../coreedge/discovery/CoreClusterMember.java | 4 +- .../coreedge/discovery/EdgeClusterMember.java | 4 +- stresstests/pom.xml | 13 + .../BackupServiceStressTesting.java | 4 +- ...ckupStoreCopyInteractionStressTesting.java | 257 ++++++++++++++++++ .../log/TransactionAppenderStressTesting.java | 3 +- 11 files changed, 319 insertions(+), 20 deletions(-) create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClusterMember.java create mode 100644 stresstests/src/test/java/org/neo4j/coreedge/stresstests/BackupStoreCopyInteractionStressTesting.java diff --git a/community/common/src/main/java/org/neo4j/function/Suppliers.java b/community/common/src/main/java/org/neo4j/function/Suppliers.java index 5b818e5fc3b2..e6fc3d8fc8da 100644 --- a/community/common/src/main/java/org/neo4j/function/Suppliers.java +++ b/community/common/src/main/java/org/neo4j/function/Suppliers.java @@ -19,10 +19,14 @@ */ package org.neo4j.function; +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import static java.lang.System.currentTimeMillis; + /** * Constructors for basic {@link Supplier} types */ @@ -118,4 +122,10 @@ public static Supplier compose( final Supplier input, final Pred { return () -> predicate.test( input.get() ); } + + public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit ) + { + final long endTimeInMilliseconds = currentTimeMillis() + unit.toMillis( duration ); + return () -> currentTimeMillis() <= endTimeInMilliseconds; + } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/TransactionAppenderStressTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/TransactionAppenderStressTest.java index c793eef137c6..494777b9f298 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/TransactionAppenderStressTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/TransactionAppenderStressTest.java @@ -47,6 +47,7 @@ import static java.lang.System.currentTimeMillis; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertEquals; +import static org.neo4j.function.Suppliers.untilTimeExpired; public class TransactionAppenderStressTest { @@ -59,7 +60,7 @@ public void concurrentTransactionAppendingTest() throws Exception int threads = 10; File workingDirectory = directory.directory( "work" ); Callable runner = new Builder() - .with( Builder.untilTimeExpired( 10, SECONDS ) ) + .with( untilTimeExpired( 10, SECONDS ) ) .withWorkingDirectory( workingDirectory ) .withNumThreads( threads ) .build(); @@ -75,12 +76,6 @@ public static class Builder private File workingDirectory; private int threads; - public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit ) - { - final long endTimeInMilliseconds = currentTimeMillis() + unit.toMillis( duration ); - return () -> currentTimeMillis() <= endTimeInMilliseconds; - } - public Builder with( BooleanSupplier condition ) { this.condition = condition; diff --git a/enterprise/backup/src/test/java/org/neo4j/backup/BackupServiceIT.java b/enterprise/backup/src/test/java/org/neo4j/backup/BackupServiceIT.java index 7b612ab2eadc..33b22c07156f 100644 --- a/enterprise/backup/src/test/java/org/neo4j/backup/BackupServiceIT.java +++ b/enterprise/backup/src/test/java/org/neo4j/backup/BackupServiceIT.java @@ -105,7 +105,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.neo4j.backup.BackupServiceStressTestingBuilder.untilTimeExpired; +import static org.neo4j.function.Suppliers.untilTimeExpired; import static org.neo4j.kernel.impl.storemigration.StoreFile.COUNTS_STORE_LEFT; import static org.neo4j.kernel.impl.storemigration.StoreFile.COUNTS_STORE_RIGHT; diff --git a/enterprise/backup/src/test/java/org/neo4j/backup/BackupServiceStressTestingBuilder.java b/enterprise/backup/src/test/java/org/neo4j/backup/BackupServiceStressTestingBuilder.java index 9a5594534282..8c5dac7a9114 100644 --- a/enterprise/backup/src/test/java/org/neo4j/backup/BackupServiceStressTestingBuilder.java +++ b/enterprise/backup/src/test/java/org/neo4j/backup/BackupServiceStressTestingBuilder.java @@ -68,12 +68,6 @@ public class BackupServiceStressTestingBuilder private String backupHostname = "localhost"; private int backupPort = 8200; - public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit ) - { - final long endTimeInMilliseconds = currentTimeMillis() + unit.toMillis( duration ); - return () -> currentTimeMillis() <= endTimeInMilliseconds; - } - public BackupServiceStressTestingBuilder until( BooleanSupplier untilCondition ) { Objects.requireNonNull( untilCondition ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClusterMember.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClusterMember.java new file mode 100644 index 000000000000..b14fee7bb875 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/ClusterMember.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.discovery; + +public interface ClusterMember +{ + void start(); + + void shutdown(); +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java index 4f50946cb381..317b55340f90 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java @@ -43,7 +43,7 @@ import static org.neo4j.coreedge.core.consensus.log.RaftLog.PHYSICAL_LOG_DIRECTORY_NAME; import static org.neo4j.helpers.collection.MapUtil.stringMap; -public class CoreClusterMember +public class CoreClusterMember implements ClusterMember { private final File neo4jHome; private final DiscoveryServiceFactory discoveryServiceFactory; @@ -98,12 +98,14 @@ public CoreClusterMember( int serverId, int clusterSize, storeDir.mkdirs(); } + @Override public void start() { database = new CoreGraphDatabase( storeDir, config, GraphDatabaseDependencies.newDependencies(), discoveryServiceFactory ); } + @Override public void shutdown() { if ( database != null ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeClusterMember.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeClusterMember.java index 74bc5a29f0a5..0156d57a67ad 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeClusterMember.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/EdgeClusterMember.java @@ -34,7 +34,7 @@ import static java.util.stream.Collectors.joining; import static org.neo4j.helpers.collection.MapUtil.stringMap; -public class EdgeClusterMember +public class EdgeClusterMember implements ClusterMember { private final Map config = stringMap(); private final DiscoveryServiceFactory discoveryServiceFactory; @@ -76,12 +76,14 @@ public class EdgeClusterMember storeDir.mkdirs(); } + @Override public void start() { database = new EdgeGraphDatabase( storeDir, config, GraphDatabaseDependencies.newDependencies(), discoveryServiceFactory ); } + @Override public void shutdown() { if ( database != null ) diff --git a/stresstests/pom.xml b/stresstests/pom.xml index 91bd23bf1698..9d247244240a 100644 --- a/stresstests/pom.xml +++ b/stresstests/pom.xml @@ -103,5 +103,18 @@ test test-jar + + org.neo4j + neo4j-core-edge + ${project.version} + test + + + org.neo4j + neo4j-core-edge + ${project.version} + test + test-jar + diff --git a/stresstests/src/test/java/org/neo4j/backup/stresstests/BackupServiceStressTesting.java b/stresstests/src/test/java/org/neo4j/backup/stresstests/BackupServiceStressTesting.java index 3e475ae76507..9452098f9482 100644 --- a/stresstests/src/test/java/org/neo4j/backup/stresstests/BackupServiceStressTesting.java +++ b/stresstests/src/test/java/org/neo4j/backup/stresstests/BackupServiceStressTesting.java @@ -22,7 +22,6 @@ import org.junit.Test; import java.io.File; -import java.io.IOException; import java.util.concurrent.Callable; import org.neo4j.backup.BackupServiceStressTestingBuilder; @@ -31,12 +30,11 @@ import static java.lang.Integer.parseInt; import static java.lang.Long.parseLong; import static java.lang.System.getProperty; -import static java.lang.System.getenv; import static java.util.concurrent.TimeUnit.MINUTES; import static org.junit.Assert.assertEquals; import static org.neo4j.StressTestingHelper.ensureExistsAndEmpty; import static org.neo4j.StressTestingHelper.fromEnv; -import static org.neo4j.backup.BackupServiceStressTestingBuilder.untilTimeExpired; +import static org.neo4j.function.Suppliers.untilTimeExpired; /** * Notice the class name: this is _not_ going to be run as part of the main build. diff --git a/stresstests/src/test/java/org/neo4j/coreedge/stresstests/BackupStoreCopyInteractionStressTesting.java b/stresstests/src/test/java/org/neo4j/coreedge/stresstests/BackupStoreCopyInteractionStressTesting.java new file mode 100644 index 000000000000..ca9a93b793d5 --- /dev/null +++ b/stresstests/src/test/java/org/neo4j/coreedge/stresstests/BackupStoreCopyInteractionStressTesting.java @@ -0,0 +1,257 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.stresstests; + +import org.junit.Test; + +import java.io.File; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.LockSupport; +import java.util.function.BooleanSupplier; +import java.util.function.IntFunction; + +import org.neo4j.backup.OnlineBackup; +import org.neo4j.backup.OnlineBackupSettings; +import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.coreedge.discovery.ClusterMember; +import org.neo4j.coreedge.discovery.HazelcastDiscoveryServiceFactory; +import org.neo4j.graphdb.DatabaseShutdownException; +import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.helpers.SocketAddress; +import org.neo4j.io.fs.FileUtils; +import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; + +import static java.lang.Integer.parseInt; +import static java.lang.Long.parseLong; +import static java.lang.System.getProperty; +import static org.neo4j.StressTestingHelper.ensureExistsAndEmpty; +import static org.neo4j.StressTestingHelper.fromEnv; +import static org.neo4j.function.Suppliers.untilTimeExpired; +import static org.neo4j.kernel.configuration.Settings.TRUE; + +public class BackupStoreCopyInteractionStressTesting +{ + private static final String DEFAULT_NUMBER_OF_CORES = "3"; + private static final String DEFAULT_NUMBER_OF_EDGES = "1"; + private static final String DEFAULT_DURATION_IN_MINUTES = "30"; + private static final String DEFAULT_WORKING_DIR = new File( getProperty( "java.io.tmpdir" ) ).getPath(); + private static final String DEFAULT_BASE_CORE_BACKUP_PORT = "8000"; + private static final String DEFAULT_BASE_EDGE_BACKUP_PORT = "9000"; + + @Test + public void shouldBehaveCorrectlyUnderStress() throws Exception + { + int numberOfCores = + parseInt( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_NUMBER_OF_CORES", DEFAULT_NUMBER_OF_CORES ) ); + int numberOfEdges = + parseInt( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_NUMBER_OF_EDGES", DEFAULT_NUMBER_OF_EDGES ) ); + long durationInMinutes = + parseLong( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_DURATION", DEFAULT_DURATION_IN_MINUTES ) ); + String workingDirectory = + fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_WORKING_DIRECTORY", DEFAULT_WORKING_DIR ); + int baseCoreBackupPort = parseInt( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_BASE_CORE_BACKUP_PORT", + DEFAULT_BASE_CORE_BACKUP_PORT ) ); + int baseEdgeBackupPort = parseInt( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_BASE_EDGE_BACKUP_PORT", + DEFAULT_BASE_EDGE_BACKUP_PORT ) ); + + File clusterDirectory = ensureExistsAndEmpty( new File( workingDirectory, "cluster" ) ); + File backupDirectory = ensureExistsAndEmpty( new File( workingDirectory, "backups" ) ); + + Map params = Collections.emptyMap(); + Map> paramsPerCoreInstance = + configureBackup( baseCoreBackupPort, baseEdgeBackupPort, true ); + Map> paramsPerEdgeInstance = + configureBackup( baseCoreBackupPort, baseEdgeBackupPort, false ); + + HazelcastDiscoveryServiceFactory discoveryServiceFactory = new HazelcastDiscoveryServiceFactory(); + Cluster cluster = new Cluster( clusterDirectory, numberOfCores, numberOfEdges, discoveryServiceFactory, params, + paramsPerCoreInstance, params, paramsPerEdgeInstance, StandardV3_0.NAME ); + + ExecutorService service = Executors.newFixedThreadPool( 3 ); + BooleanSupplier keepGoing = untilTimeExpired( durationInMinutes, TimeUnit.MINUTES ); + + try + { + cluster.start(); + Future workload = service.submit( workLoad( cluster, keepGoing ) ); + Future startStopWorker = service.submit( startStopLoad( cluster, keepGoing ) ); + Future backupWorker = service.submit( + backupLoad( backupDirectory, baseCoreBackupPort, baseEdgeBackupPort, cluster, keepGoing ) ); + + workload.get(); + startStopWorker.get(); + backupWorker.get(); + } + finally + { + cluster.shutdown(); + } + + // let's cleanup disk space when everything went well + FileUtils.deleteRecursively( clusterDirectory ); + FileUtils.deleteRecursively( backupDirectory ); + } + + private Runnable workLoad( Cluster cluster, BooleanSupplier keepGoing ) + { + return new RepeatUntilRunnable( keepGoing ) + { + @Override + protected void doWork() + { + try + { + cluster.coreTx( ( db, tx ) -> + { + db.createNode(); + tx.success(); + } ); + } + catch ( InterruptedException e ) + { + // whatever let's go on with the workload + Thread.interrupted(); + } + catch ( TimeoutException | DatabaseShutdownException e ) + { + // whatever let's go on with the workload + } + } + }; + } + + private Runnable startStopLoad( Cluster cluster, BooleanSupplier keepGoing ) + { + return new RepeatUntilOnSelectedMemberRunnable( keepGoing, cluster ) + { + @Override + protected void doWorkOnMember( boolean isCore, int id ) + { + ClusterMember member = pickSingleMember( cluster, id, isCore ); + member.shutdown(); + LockSupport.parkNanos( 500_000_000 ); + member.start(); + } + }; + } + + private Runnable backupLoad( File baseDirectory, int baseCoreBackupPort, int baseEdgeBackupPort, Cluster cluster, + BooleanSupplier keepGoing ) + { + return new RepeatUntilOnSelectedMemberRunnable( keepGoing, cluster ) + { + @Override + protected void doWorkOnMember( boolean isCore, int id ) + { + SocketAddress address = backupAddress( baseCoreBackupPort, baseEdgeBackupPort, isCore, id ); + File backupDirectory = new File( baseDirectory, Integer.toString( address.getPort() ) ); + OnlineBackup backup = + OnlineBackup.from( address.getHostname(), address.getPort() ).backup( backupDirectory ); + + if ( !backup.isConsistent() ) + { + System.err.println( "Not consistent backup from " + address ); + } + } + }; + } + + private static abstract class RepeatUntilOnSelectedMemberRunnable extends RepeatUntilRunnable + { + private final Random random = new Random(); + private final Cluster cluster; + + RepeatUntilOnSelectedMemberRunnable( BooleanSupplier keepGoing, Cluster cluster ) + { + super( keepGoing ); + this.cluster = cluster; + } + + @Override + protected final void doWork() + { + boolean isCore = random.nextBoolean(); + Collection members = pickMembers( cluster, isCore ); + if ( members.isEmpty() ) + { + return; + } + int id = random.nextInt( members.size() ); + doWorkOnMember( isCore, id ); + } + + protected abstract void doWorkOnMember( boolean isCore, int id ); + } + + private static abstract class RepeatUntilRunnable implements Runnable + { + private BooleanSupplier keepGoing; + + RepeatUntilRunnable( BooleanSupplier keepGoing ) + { + this.keepGoing = keepGoing; + } + + @Override + public final void run() + { + while ( keepGoing.getAsBoolean() ) + { + doWork(); + } + } + + protected abstract void doWork(); + } + + private static Collection pickMembers( Cluster cluster, boolean isCore ) + { + return isCore ? cluster.coreMembers() : cluster.edgeMembers(); + } + + private static ClusterMember pickSingleMember( Cluster cluster, int id, boolean isCore ) + { + return isCore ? cluster.getCoreMemberById( id ) : cluster.getEdgeMemberById( id ); + } + + private static Map> configureBackup( int baseCoreBackupPort, int baseEdgeBackupPort, + boolean isCore ) + { + Map> settings = new HashMap<>(); + settings.put( OnlineBackupSettings.online_backup_enabled.name(), id -> TRUE ); + settings.put( OnlineBackupSettings.online_backup_server.name(), + id -> backupAddress( baseCoreBackupPort, baseEdgeBackupPort, isCore, id ).toString() ); + return settings; + } + + private static SocketAddress backupAddress( int baseCoreBackupPort, int baseEdgeBackupPort, boolean isCore, int id ) + { + return new AdvertisedSocketAddress( "localhost", (isCore ? baseCoreBackupPort : baseEdgeBackupPort) + id ); + } +} diff --git a/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/log/TransactionAppenderStressTesting.java b/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/log/TransactionAppenderStressTesting.java index f6c2294e33ef..17a02291febd 100644 --- a/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/log/TransactionAppenderStressTesting.java +++ b/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/log/TransactionAppenderStressTesting.java @@ -34,6 +34,7 @@ import static org.junit.Assert.assertEquals; import static org.neo4j.StressTestingHelper.ensureExistsAndEmpty; import static org.neo4j.StressTestingHelper.fromEnv; +import static org.neo4j.function.Suppliers.untilTimeExpired; /** * Notice the class name: this is _not_ going to be run as part of the main build. @@ -52,7 +53,7 @@ public void shouldBehaveCorrectlyUnderStress() throws Throwable int threads = parseInt( fromEnv( "TX_APPENDER_NUM_THREADS", DEFAULT_NUM_THREADS ) ); Callable runner = new Builder() - .with( Builder.untilTimeExpired( durationInMinutes, MINUTES ) ) + .with( untilTimeExpired( durationInMinutes, MINUTES ) ) .withWorkingDirectory( ensureExistsAndEmpty( workingDirectory ) ) .withNumThreads( threads ) .build();