From c2d959659d5c91103e2f592b55b4cd9c58b39e41 Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Wed, 21 Mar 2018 12:17:40 +0100 Subject: [PATCH] In process backup using new command in stress test --- .../neo4j/causalclustering/BackupUtil.java | 19 ++++++++- .../tx/ReplicatedTransactionStateMachine.java | 1 - .../discovery/ClusterMember.java | 3 ++ .../discovery/CoreClusterMember.java | 6 +-- .../discovery/ReadReplica.java | 8 ++++ .../ConvertNonCausalClusteringStoreIT.java | 2 +- .../stresstests/BackupLoad.java | 40 +++++-------------- ...ckupStoreCopyInteractionStressTesting.java | 29 ++------------ .../stresstests/ClusterConfiguration.java | 13 ------ .../RepeatUntilOnSelectedMemberCallable.java | 4 +- .../org/neo4j/helper/RepeatUntilCallable.java | 4 +- 11 files changed, 49 insertions(+), 80 deletions(-) diff --git a/enterprise/backup/src/test/java/org/neo4j/causalclustering/BackupUtil.java b/enterprise/backup/src/test/java/org/neo4j/causalclustering/BackupUtil.java index a9758233e59a8..8e9fdd1a1f787 100644 --- a/enterprise/backup/src/test/java/org/neo4j/causalclustering/BackupUtil.java +++ b/enterprise/backup/src/test/java/org/neo4j/causalclustering/BackupUtil.java @@ -22,8 +22,12 @@ import java.io.File; import java.io.IOException; +import org.neo4j.backup.impl.OnlineBackupCommandBuilder; +import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.discovery.ClusterMember; import org.neo4j.causalclustering.discovery.CoreClusterMember; import org.neo4j.commandline.admin.CommandFailed; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.configuration.Config; import org.neo4j.restore.RestoreDatabaseCommand; @@ -46,7 +50,20 @@ public static File createBackupFromCore( CoreClusterMember core, String backupNa return new File( baseBackupDir, backupName ); } - static void restoreFromBackup( File backup, FileSystemAbstraction fsa, CoreClusterMember coreClusterMember ) throws IOException, CommandFailed + public static File createBackupInProcess( ClusterMember member, File baseBackupDir, String backupName ) throws Exception + { + AdvertisedSocketAddress address = member.config().get( CausalClusteringSettings.transaction_advertised_address ); + File targetDir = new File( baseBackupDir, backupName ); + + new OnlineBackupCommandBuilder() + .withHost( address.getHostname() ) + .withPort( address.getPort() ) + .backup( targetDir ); + + return targetDir; + } + + public static void restoreFromBackup( File backup, FileSystemAbstraction fsa, CoreClusterMember coreClusterMember ) throws IOException, CommandFailed { Config config = coreClusterMember.config(); RestoreDatabaseCommand restoreDatabaseCommand = new RestoreDatabaseCommand( fsa, backup, config, "graph-db", true ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionStateMachine.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionStateMachine.java index d3175f67c594b..ad7bf30a24fe8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionStateMachine.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionStateMachine.java @@ -19,7 +19,6 @@ */ package org.neo4j.causalclustering.core.state.machines.tx; -import java.io.IOException; import java.util.function.Consumer; import org.neo4j.causalclustering.core.state.Result; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ClusterMember.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ClusterMember.java index e30d5a6c2786c..a790a28845ebb 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ClusterMember.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ClusterMember.java @@ -19,6 +19,7 @@ */ package org.neo4j.causalclustering.discovery; +import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.monitoring.Monitors; @@ -34,6 +35,8 @@ public interface ClusterMember String settingValue( String settingName ); + Config config(); + /** * {@link Cluster} will use this {@link ThreadGroup} for the threads that start, and shut down, this cluster member. * This way, the group will be transitively inherited by all the threads that are in turn started by the member diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreClusterMember.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreClusterMember.java index 163caed9441c3..dd137871a70b1 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreClusterMember.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreClusterMember.java @@ -196,11 +196,6 @@ public File storeDir() return storeDir; } - public Config getMemberConfig() - { - return memberConfig; - } - public RaftLogPruner raftLogPruner() { return database.getDependencyResolver().resolveDependency( RaftLogPruner.class ); @@ -258,6 +253,7 @@ public String settingValue( String settingName ) return config.get(settingName); } + @Override public Config config() { return memberConfig; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java index fb1032aa3d098..918dc5db680e5 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java @@ -54,6 +54,7 @@ public class ReadReplica implements ClusterMember protected final File storeDir; private final int serverId; private final String boltAdvertisedSocketAddress; + private final Config memberConfig; protected ReadReplicaGraphDatabase database; protected Monitors monitors; private final ThreadGroup threadGroup; @@ -99,6 +100,7 @@ public ReadReplica( File parentDir, int serverId, int boltPort, int httpPort, in config.put( OnlineBackupSettings.online_backup_server.name(), listenAddress( listenAddress, backupPort ) ); config.put( GraphDatabaseSettings.logs_directory.name(), new File( neo4jHome, "logs" ).getAbsolutePath() ); config.put( GraphDatabaseSettings.logical_logs_location.name(), "replica-tx-logs-" + serverId ); + memberConfig = Config.defaults( config ); this.discoveryServiceFactory = discoveryServiceFactory; storeDir = new File( new File( new File( neo4jHome, "data" ), "databases" ), "graph.db" ); @@ -207,4 +209,10 @@ public int serverId() { return serverId; } + + @Override + public Config config() + { + return memberConfig; + } } diff --git a/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/ConvertNonCausalClusteringStoreIT.java b/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/ConvertNonCausalClusteringStoreIT.java index 8fe787bde0602..fc32c46237400 100644 --- a/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/ConvertNonCausalClusteringStoreIT.java +++ b/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/ConvertNonCausalClusteringStoreIT.java @@ -83,7 +83,7 @@ public void shouldReplicateTransactionToCoreMembers() throws Throwable { for ( CoreClusterMember core : cluster.coreMembers() ) { - new RestoreDatabaseCommand( fileSystem, classicNeo4jStore, core.getMemberConfig(), core.settingValue( + new RestoreDatabaseCommand( fileSystem, classicNeo4jStore, core.config(), core.settingValue( GraphDatabaseSettings.active_database.name() ), true ).execute(); } } diff --git a/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/BackupLoad.java b/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/BackupLoad.java index 740491be4404c..8e71638e79ce3 100644 --- a/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/BackupLoad.java +++ b/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/BackupLoad.java @@ -20,58 +20,43 @@ package org.neo4j.causalclustering.stresstests; import java.io.File; -import java.io.IOException; -import java.io.OutputStream; import java.util.concurrent.locks.LockSupport; -import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Predicate; +import org.neo4j.causalclustering.BackupUtil; +import org.neo4j.causalclustering.discovery.ClusterMember; import org.neo4j.helper.IsChannelClosedException; import org.neo4j.helper.IsConnectionException; import org.neo4j.helper.IsConnectionRestByPeer; import org.neo4j.helper.IsStoreClosed; -import org.neo4j.backup.OnlineBackup; import org.neo4j.causalclustering.discovery.Cluster; -import org.neo4j.helpers.SocketAddress; class BackupLoad extends RepeatUntilOnSelectedMemberCallable { - private static final OutputStream NULL_OUTPUT_STREAM = new OutputStream() - { - @Override - public void write( int b ) - { - // it's *null* output stream - } - }; - private final Predicate isTransientError = new IsConnectionException().or( new IsConnectionRestByPeer() ).or( new IsChannelClosedException() ) .or( new IsStoreClosed() ); - private final File baseDirectory; - private final BiFunction backupAddress; + private final File baseBackupDir; + private long backupNumber; BackupLoad( BooleanSupplier keepGoing, Runnable onFailure, Cluster cluster, int numberOfCores, int numberOfEdges, - File baseDirectory, BiFunction backupAddress ) + File baseBackupDir ) { super( keepGoing, onFailure, cluster, numberOfCores, numberOfEdges ); - this.baseDirectory = baseDirectory; - this.backupAddress = backupAddress; + this.baseBackupDir = baseBackupDir; } @Override - protected void doWorkOnMember( boolean isCore, int id ) + protected void doWorkOnMember( boolean isCore, int id ) throws Exception { - SocketAddress address = backupAddress.apply( isCore, id ); - File backupDirectory = new File( baseDirectory, Integer.toString( address.getPort() ) ); + ClusterMember member = isCore ? cluster.getCoreMemberById( id ) : cluster.getReadReplicaById( id ); - OnlineBackup backup; try { - backup = OnlineBackup.from( address.getHostname(), address.getPort() ).withOutput( NULL_OUTPUT_STREAM ) - .backup( backupDirectory ); + String backupName = "backup-" + backupNumber++; + BackupUtil.createBackupInProcess( member, baseBackupDir, backupName ); } catch ( RuntimeException e ) { @@ -83,10 +68,5 @@ protected void doWorkOnMember( boolean isCore, int id ) } throw e; } - - if ( !backup.isConsistent() ) - { - throw new RuntimeException( "Not consistent backup from " + address ); - } } } diff --git a/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/BackupStoreCopyInteractionStressTesting.java b/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/BackupStoreCopyInteractionStressTesting.java index 24df2a158476b..9c54e3bfdb81a 100644 --- a/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/BackupStoreCopyInteractionStressTesting.java +++ b/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/BackupStoreCopyInteractionStressTesting.java @@ -26,9 +26,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiFunction; import java.util.function.BooleanSupplier; -import java.util.function.IntFunction; import org.junit.Before; import org.junit.Rule; @@ -39,8 +37,6 @@ import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory; import org.neo4j.causalclustering.discovery.IpFamily; import org.neo4j.concurrent.Futures; -import org.neo4j.helpers.AdvertisedSocketAddress; -import org.neo4j.helpers.SocketAddress; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileUtils; import org.neo4j.io.pagecache.PageCache; @@ -52,8 +48,8 @@ import static java.lang.Integer.parseInt; import static java.lang.Long.parseLong; import static java.lang.System.getProperty; +import static java.util.Collections.emptyMap; import static java.util.concurrent.TimeUnit.MINUTES; -import static org.neo4j.causalclustering.stresstests.ClusterConfiguration.configureBackup; import static org.neo4j.causalclustering.stresstests.ClusterConfiguration.configureRaftLogRotationAndPruning; import static org.neo4j.causalclustering.stresstests.ClusterConfiguration.enableRaftMessageLogging; import static org.neo4j.function.Suppliers.untilTimeExpired; @@ -69,8 +65,6 @@ public class BackupStoreCopyInteractionStressTesting private static final String DEFAULT_ENABLE_INDEXES = "false"; private static final String DEFAULT_TX_PRUNE = "50 files"; private static final String DEFAULT_WORKING_DIR = new File( getProperty( "java.io.tmpdir" ) ).getPath(); - private static final String DEFAULT_BASE_CORE_BACKUP_PORT = "8000"; - private static final String DEFAULT_BASE_EDGE_BACKUP_PORT = "9000"; private final DefaultFileSystemRule fileSystemRule = new DefaultFileSystemRule(); private final PageCacheRule pageCacheRule = new PageCacheRule(); @@ -99,10 +93,6 @@ public void shouldBehaveCorrectlyUnderStress() throws Exception parseLong( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_DURATION", DEFAULT_DURATION_IN_MINUTES ) ); String workingDirectory = fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_WORKING_DIRECTORY", DEFAULT_WORKING_DIR ); - int baseCoreBackupPort = parseInt( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_BASE_CORE_BACKUP_PORT", - DEFAULT_BASE_CORE_BACKUP_PORT ) ); - int baseEdgeBackupPort = parseInt( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_BASE_EDGE_BACKUP_PORT", - DEFAULT_BASE_EDGE_BACKUP_PORT ) ); boolean enableIndexes = parseBoolean( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_ENABLE_INDEXES", DEFAULT_ENABLE_INDEXES ) ); String txPrune = fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_TX_PRUNE", DEFAULT_TX_PRUNE ); @@ -110,23 +100,13 @@ public void shouldBehaveCorrectlyUnderStress() throws Exception File clusterDirectory = ensureExistsAndEmpty( new File( workingDirectory, "cluster" ) ); File backupDirectory = ensureExistsAndEmpty( new File( workingDirectory, "backups" ) ); - BiFunction backupAddress = ( isCore, id ) -> - new AdvertisedSocketAddress( "localhost", (isCore ? baseCoreBackupPort : baseEdgeBackupPort) + id ); - Map coreParams = enableRaftMessageLogging( configureRaftLogRotationAndPruning( configureTxLogRotationAndPruning( new HashMap<>(), txPrune ) ) ); Map readReplicaParams = configureTxLogRotationAndPruning( new HashMap<>(), txPrune ); - Map> instanceCoreParams = - configureBackup( new HashMap<>(), id -> backupAddress.apply( true, id ) ); - Map> instanceReadReplicaParams = - configureBackup( new HashMap<>(), id -> backupAddress.apply( false, id ) ); - HazelcastDiscoveryServiceFactory discoveryServiceFactory = new HazelcastDiscoveryServiceFactory(); - Cluster cluster = - new Cluster( clusterDirectory, numberOfCores, numberOfEdges, discoveryServiceFactory, coreParams, - instanceCoreParams, readReplicaParams, instanceReadReplicaParams, Standard.LATEST_NAME, - IpFamily.IPV4, false ); + Cluster cluster = new Cluster( clusterDirectory, numberOfCores, numberOfEdges, discoveryServiceFactory, coreParams, + emptyMap(), readReplicaParams, emptyMap(), Standard.LATEST_NAME, IpFamily.IPV4, false ); AtomicBoolean stopTheWorld = new AtomicBoolean(); BooleanSupplier notExpired = untilTimeExpired( durationInMinutes, MINUTES ); @@ -146,8 +126,7 @@ public void shouldBehaveCorrectlyUnderStress() throws Exception Future startStopWorker = service.submit( new StartStopLoad( fs, pageCache, keepGoing, onFailure, cluster, numberOfCores, numberOfEdges ) ); Future backupWorker = service.submit( - new BackupLoad( keepGoing, onFailure, cluster, numberOfCores, numberOfEdges, backupDirectory, - backupAddress ) ); + new BackupLoad( keepGoing, onFailure, cluster, numberOfCores, numberOfEdges, backupDirectory ) ); Futures.combine(workload, startStopWorker, backupWorker).get( durationInMinutes + 5, MINUTES ); } diff --git a/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/ClusterConfiguration.java b/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/ClusterConfiguration.java index 5c37c4a45e052..9d39baa913684 100644 --- a/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/ClusterConfiguration.java +++ b/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/ClusterConfiguration.java @@ -20,15 +20,10 @@ package org.neo4j.causalclustering.stresstests; import java.util.Map; -import java.util.function.IntFunction; import org.neo4j.causalclustering.core.CausalClusteringSettings; -import org.neo4j.helpers.SocketAddress; -import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings; import org.neo4j.kernel.configuration.Settings; -import static org.neo4j.kernel.configuration.Settings.TRUE; - class ClusterConfiguration { private ClusterConfiguration() @@ -49,12 +44,4 @@ static Map configureRaftLogRotationAndPruning( Map settings.put( CausalClusteringSettings.raft_log_pruning_strategy.name(), "keep_none" ); return settings; } - - static Map> configureBackup( Map> settings, - IntFunction address ) - { - settings.put( OnlineBackupSettings.online_backup_enabled.name(), id -> TRUE ); - settings.put( OnlineBackupSettings.online_backup_server.name(), id -> address.apply( id ).toString() ); - return settings; - } } diff --git a/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/RepeatUntilOnSelectedMemberCallable.java b/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/RepeatUntilOnSelectedMemberCallable.java index 66eae4286b777..9d1efa2f46e69 100644 --- a/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/RepeatUntilOnSelectedMemberCallable.java +++ b/stresstests/src/test/java/org/neo4j/causalclustering/stresstests/RepeatUntilOnSelectedMemberCallable.java @@ -44,7 +44,7 @@ abstract class RepeatUntilOnSelectedMemberCallable extends RepeatUntilCallable } @Override - protected final void doWork() + protected final void doWork() throws Exception { boolean isCore = numberOfEdges == 0 || random.nextBoolean(); Collection members = isCore ? cluster.coreMembers() : cluster.readReplicas(); @@ -53,5 +53,5 @@ protected final void doWork() doWorkOnMember( isCore, id ); } - protected abstract void doWorkOnMember( boolean isCore, int id ); + protected abstract void doWorkOnMember( boolean isCore, int id ) throws Exception; } diff --git a/stresstests/src/test/java/org/neo4j/helper/RepeatUntilCallable.java b/stresstests/src/test/java/org/neo4j/helper/RepeatUntilCallable.java index 84eb35a32527a..84df34a885fd4 100644 --- a/stresstests/src/test/java/org/neo4j/helper/RepeatUntilCallable.java +++ b/stresstests/src/test/java/org/neo4j/helper/RepeatUntilCallable.java @@ -45,9 +45,9 @@ public final void run() catch ( Throwable t ) { onFailure.run(); - throw t; + throw new RuntimeException( t ); } } - protected abstract void doWork(); + protected abstract void doWork() throws Exception; }