Skip to content

Commit

Permalink
In process backup using new command in stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Mar 21, 2018
1 parent e2a5f5d commit c2d9596
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 80 deletions.
Expand Up @@ -22,8 +22,12 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;


import org.neo4j.backup.impl.OnlineBackupCommandBuilder;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.discovery.ClusterMember;
import org.neo4j.causalclustering.discovery.CoreClusterMember; import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.commandline.admin.CommandFailed; import org.neo4j.commandline.admin.CommandFailed;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.restore.RestoreDatabaseCommand; import org.neo4j.restore.RestoreDatabaseCommand;
Expand All @@ -46,7 +50,20 @@ public static File createBackupFromCore( CoreClusterMember core, String backupNa
return new File( baseBackupDir, backupName ); return new File( baseBackupDir, backupName );
} }


static void restoreFromBackup( File backup, FileSystemAbstraction fsa, CoreClusterMember coreClusterMember ) throws IOException, CommandFailed public static File createBackupInProcess( ClusterMember<?> member, File baseBackupDir, String backupName ) throws Exception
{
AdvertisedSocketAddress address = member.config().get( CausalClusteringSettings.transaction_advertised_address );
File targetDir = new File( baseBackupDir, backupName );

new OnlineBackupCommandBuilder()
.withHost( address.getHostname() )
.withPort( address.getPort() )
.backup( targetDir );

return targetDir;
}

public static void restoreFromBackup( File backup, FileSystemAbstraction fsa, CoreClusterMember coreClusterMember ) throws IOException, CommandFailed
{ {
Config config = coreClusterMember.config(); Config config = coreClusterMember.config();
RestoreDatabaseCommand restoreDatabaseCommand = new RestoreDatabaseCommand( fsa, backup, config, "graph-db", true ); RestoreDatabaseCommand restoreDatabaseCommand = new RestoreDatabaseCommand( fsa, backup, config, "graph-db", true );
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.causalclustering.core.state.machines.tx; package org.neo4j.causalclustering.core.state.machines.tx;


import java.io.IOException;
import java.util.function.Consumer; import java.util.function.Consumer;


import org.neo4j.causalclustering.core.state.Result; import org.neo4j.causalclustering.core.state.Result;
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/ */
package org.neo4j.causalclustering.discovery; package org.neo4j.causalclustering.discovery;


import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;


Expand All @@ -34,6 +35,8 @@ public interface ClusterMember<T extends GraphDatabaseAPI>


String settingValue( String settingName ); String settingValue( String settingName );


Config config();

/** /**
* {@link Cluster} will use this {@link ThreadGroup} for the threads that start, and shut down, this cluster member. * {@link Cluster} will use this {@link ThreadGroup} for the threads that start, and shut down, this cluster member.
* This way, the group will be transitively inherited by all the threads that are in turn started by the member * This way, the group will be transitively inherited by all the threads that are in turn started by the member
Expand Down
Expand Up @@ -196,11 +196,6 @@ public File storeDir()
return storeDir; return storeDir;
} }


public Config getMemberConfig()
{
return memberConfig;
}

public RaftLogPruner raftLogPruner() public RaftLogPruner raftLogPruner()
{ {
return database.getDependencyResolver().resolveDependency( RaftLogPruner.class ); return database.getDependencyResolver().resolveDependency( RaftLogPruner.class );
Expand Down Expand Up @@ -258,6 +253,7 @@ public String settingValue( String settingName )
return config.get(settingName); return config.get(settingName);
} }


@Override
public Config config() public Config config()
{ {
return memberConfig; return memberConfig;
Expand Down
Expand Up @@ -54,6 +54,7 @@ public class ReadReplica implements ClusterMember
protected final File storeDir; protected final File storeDir;
private final int serverId; private final int serverId;
private final String boltAdvertisedSocketAddress; private final String boltAdvertisedSocketAddress;
private final Config memberConfig;
protected ReadReplicaGraphDatabase database; protected ReadReplicaGraphDatabase database;
protected Monitors monitors; protected Monitors monitors;
private final ThreadGroup threadGroup; private final ThreadGroup threadGroup;
Expand Down Expand Up @@ -99,6 +100,7 @@ public ReadReplica( File parentDir, int serverId, int boltPort, int httpPort, in
config.put( OnlineBackupSettings.online_backup_server.name(), listenAddress( listenAddress, backupPort ) ); config.put( OnlineBackupSettings.online_backup_server.name(), listenAddress( listenAddress, backupPort ) );
config.put( GraphDatabaseSettings.logs_directory.name(), new File( neo4jHome, "logs" ).getAbsolutePath() ); config.put( GraphDatabaseSettings.logs_directory.name(), new File( neo4jHome, "logs" ).getAbsolutePath() );
config.put( GraphDatabaseSettings.logical_logs_location.name(), "replica-tx-logs-" + serverId ); config.put( GraphDatabaseSettings.logical_logs_location.name(), "replica-tx-logs-" + serverId );
memberConfig = Config.defaults( config );


this.discoveryServiceFactory = discoveryServiceFactory; this.discoveryServiceFactory = discoveryServiceFactory;
storeDir = new File( new File( new File( neo4jHome, "data" ), "databases" ), "graph.db" ); storeDir = new File( new File( new File( neo4jHome, "data" ), "databases" ), "graph.db" );
Expand Down Expand Up @@ -207,4 +209,10 @@ public int serverId()
{ {
return serverId; return serverId;
} }

@Override
public Config config()
{
return memberConfig;
}
} }
Expand Up @@ -83,7 +83,7 @@ public void shouldReplicateTransactionToCoreMembers() throws Throwable
{ {
for ( CoreClusterMember core : cluster.coreMembers() ) for ( CoreClusterMember core : cluster.coreMembers() )
{ {
new RestoreDatabaseCommand( fileSystem, classicNeo4jStore, core.getMemberConfig(), core.settingValue( new RestoreDatabaseCommand( fileSystem, classicNeo4jStore, core.config(), core.settingValue(
GraphDatabaseSettings.active_database.name() ), true ).execute(); GraphDatabaseSettings.active_database.name() ), true ).execute();
} }
} }
Expand Down
Expand Up @@ -20,58 +20,43 @@
package org.neo4j.causalclustering.stresstests; package org.neo4j.causalclustering.stresstests;


import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier; import java.util.function.BooleanSupplier;
import java.util.function.Predicate; import java.util.function.Predicate;


import org.neo4j.causalclustering.BackupUtil;
import org.neo4j.causalclustering.discovery.ClusterMember;
import org.neo4j.helper.IsChannelClosedException; import org.neo4j.helper.IsChannelClosedException;
import org.neo4j.helper.IsConnectionException; import org.neo4j.helper.IsConnectionException;
import org.neo4j.helper.IsConnectionRestByPeer; import org.neo4j.helper.IsConnectionRestByPeer;
import org.neo4j.helper.IsStoreClosed; import org.neo4j.helper.IsStoreClosed;
import org.neo4j.backup.OnlineBackup;
import org.neo4j.causalclustering.discovery.Cluster; import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.helpers.SocketAddress;


class BackupLoad extends RepeatUntilOnSelectedMemberCallable class BackupLoad extends RepeatUntilOnSelectedMemberCallable
{ {
private static final OutputStream NULL_OUTPUT_STREAM = new OutputStream()
{
@Override
public void write( int b )
{
// it's *null* output stream
}
};

private final Predicate<Throwable> isTransientError = private final Predicate<Throwable> isTransientError =
new IsConnectionException().or( new IsConnectionRestByPeer() ).or( new IsChannelClosedException() ) new IsConnectionException().or( new IsConnectionRestByPeer() ).or( new IsChannelClosedException() )
.or( new IsStoreClosed() ); .or( new IsStoreClosed() );


private final File baseDirectory; private final File baseBackupDir;
private final BiFunction<Boolean,Integer,SocketAddress> backupAddress; private long backupNumber;


BackupLoad( BooleanSupplier keepGoing, Runnable onFailure, Cluster cluster, int numberOfCores, int numberOfEdges, BackupLoad( BooleanSupplier keepGoing, Runnable onFailure, Cluster cluster, int numberOfCores, int numberOfEdges,
File baseDirectory, BiFunction<Boolean,Integer,SocketAddress> backupAddress ) File baseBackupDir )
{ {
super( keepGoing, onFailure, cluster, numberOfCores, numberOfEdges ); super( keepGoing, onFailure, cluster, numberOfCores, numberOfEdges );
this.baseDirectory = baseDirectory; this.baseBackupDir = baseBackupDir;
this.backupAddress = backupAddress;
} }


@Override @Override
protected void doWorkOnMember( boolean isCore, int id ) protected void doWorkOnMember( boolean isCore, int id ) throws Exception
{ {
SocketAddress address = backupAddress.apply( isCore, id ); ClusterMember<?> member = isCore ? cluster.getCoreMemberById( id ) : cluster.getReadReplicaById( id );
File backupDirectory = new File( baseDirectory, Integer.toString( address.getPort() ) );


OnlineBackup backup;
try try
{ {
backup = OnlineBackup.from( address.getHostname(), address.getPort() ).withOutput( NULL_OUTPUT_STREAM ) String backupName = "backup-" + backupNumber++;
.backup( backupDirectory ); BackupUtil.createBackupInProcess( member, baseBackupDir, backupName );
} }
catch ( RuntimeException e ) catch ( RuntimeException e )
{ {
Expand All @@ -83,10 +68,5 @@ protected void doWorkOnMember( boolean isCore, int id )
} }
throw e; throw e;
} }

if ( !backup.isConsistent() )
{
throw new RuntimeException( "Not consistent backup from " + address );
}
} }
} }
Expand Up @@ -26,9 +26,7 @@
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier; import java.util.function.BooleanSupplier;
import java.util.function.IntFunction;


import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
Expand All @@ -39,8 +37,6 @@
import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory; import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory;
import org.neo4j.causalclustering.discovery.IpFamily; import org.neo4j.causalclustering.discovery.IpFamily;
import org.neo4j.concurrent.Futures; import org.neo4j.concurrent.Futures;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.FileUtils; import org.neo4j.io.fs.FileUtils;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
Expand All @@ -52,8 +48,8 @@
import static java.lang.Integer.parseInt; import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong; import static java.lang.Long.parseLong;
import static java.lang.System.getProperty; import static java.lang.System.getProperty;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;
import static org.neo4j.causalclustering.stresstests.ClusterConfiguration.configureBackup;
import static org.neo4j.causalclustering.stresstests.ClusterConfiguration.configureRaftLogRotationAndPruning; import static org.neo4j.causalclustering.stresstests.ClusterConfiguration.configureRaftLogRotationAndPruning;
import static org.neo4j.causalclustering.stresstests.ClusterConfiguration.enableRaftMessageLogging; import static org.neo4j.causalclustering.stresstests.ClusterConfiguration.enableRaftMessageLogging;
import static org.neo4j.function.Suppliers.untilTimeExpired; import static org.neo4j.function.Suppliers.untilTimeExpired;
Expand All @@ -69,8 +65,6 @@ public class BackupStoreCopyInteractionStressTesting
private static final String DEFAULT_ENABLE_INDEXES = "false"; private static final String DEFAULT_ENABLE_INDEXES = "false";
private static final String DEFAULT_TX_PRUNE = "50 files"; private static final String DEFAULT_TX_PRUNE = "50 files";
private static final String DEFAULT_WORKING_DIR = new File( getProperty( "java.io.tmpdir" ) ).getPath(); private static final String DEFAULT_WORKING_DIR = new File( getProperty( "java.io.tmpdir" ) ).getPath();
private static final String DEFAULT_BASE_CORE_BACKUP_PORT = "8000";
private static final String DEFAULT_BASE_EDGE_BACKUP_PORT = "9000";


private final DefaultFileSystemRule fileSystemRule = new DefaultFileSystemRule(); private final DefaultFileSystemRule fileSystemRule = new DefaultFileSystemRule();
private final PageCacheRule pageCacheRule = new PageCacheRule(); private final PageCacheRule pageCacheRule = new PageCacheRule();
Expand Down Expand Up @@ -99,34 +93,20 @@ public void shouldBehaveCorrectlyUnderStress() throws Exception
parseLong( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_DURATION", DEFAULT_DURATION_IN_MINUTES ) ); parseLong( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_DURATION", DEFAULT_DURATION_IN_MINUTES ) );
String workingDirectory = String workingDirectory =
fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_WORKING_DIRECTORY", DEFAULT_WORKING_DIR ); fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_WORKING_DIRECTORY", DEFAULT_WORKING_DIR );
int baseCoreBackupPort = parseInt( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_BASE_CORE_BACKUP_PORT",
DEFAULT_BASE_CORE_BACKUP_PORT ) );
int baseEdgeBackupPort = parseInt( fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_BASE_EDGE_BACKUP_PORT",
DEFAULT_BASE_EDGE_BACKUP_PORT ) );
boolean enableIndexes = parseBoolean( boolean enableIndexes = parseBoolean(
fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_ENABLE_INDEXES", DEFAULT_ENABLE_INDEXES ) ); fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_ENABLE_INDEXES", DEFAULT_ENABLE_INDEXES ) );
String txPrune = fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_TX_PRUNE", DEFAULT_TX_PRUNE ); String txPrune = fromEnv( "BACKUP_STORE_COPY_INTERACTION_STRESS_TX_PRUNE", DEFAULT_TX_PRUNE );


File clusterDirectory = ensureExistsAndEmpty( new File( workingDirectory, "cluster" ) ); File clusterDirectory = ensureExistsAndEmpty( new File( workingDirectory, "cluster" ) );
File backupDirectory = ensureExistsAndEmpty( new File( workingDirectory, "backups" ) ); File backupDirectory = ensureExistsAndEmpty( new File( workingDirectory, "backups" ) );


BiFunction<Boolean,Integer,SocketAddress> backupAddress = ( isCore, id ) ->
new AdvertisedSocketAddress( "localhost", (isCore ? baseCoreBackupPort : baseEdgeBackupPort) + id );

Map<String,String> coreParams = enableRaftMessageLogging( Map<String,String> coreParams = enableRaftMessageLogging(
configureRaftLogRotationAndPruning( configureTxLogRotationAndPruning( new HashMap<>(), txPrune ) ) ); configureRaftLogRotationAndPruning( configureTxLogRotationAndPruning( new HashMap<>(), txPrune ) ) );
Map<String,String> readReplicaParams = configureTxLogRotationAndPruning( new HashMap<>(), txPrune ); Map<String,String> readReplicaParams = configureTxLogRotationAndPruning( new HashMap<>(), txPrune );


Map<String,IntFunction<String>> instanceCoreParams =
configureBackup( new HashMap<>(), id -> backupAddress.apply( true, id ) );
Map<String,IntFunction<String>> instanceReadReplicaParams =
configureBackup( new HashMap<>(), id -> backupAddress.apply( false, id ) );

HazelcastDiscoveryServiceFactory discoveryServiceFactory = new HazelcastDiscoveryServiceFactory(); HazelcastDiscoveryServiceFactory discoveryServiceFactory = new HazelcastDiscoveryServiceFactory();
Cluster cluster = Cluster cluster = new Cluster( clusterDirectory, numberOfCores, numberOfEdges, discoveryServiceFactory, coreParams,
new Cluster( clusterDirectory, numberOfCores, numberOfEdges, discoveryServiceFactory, coreParams, emptyMap(), readReplicaParams, emptyMap(), Standard.LATEST_NAME, IpFamily.IPV4, false );
instanceCoreParams, readReplicaParams, instanceReadReplicaParams, Standard.LATEST_NAME,
IpFamily.IPV4, false );


AtomicBoolean stopTheWorld = new AtomicBoolean(); AtomicBoolean stopTheWorld = new AtomicBoolean();
BooleanSupplier notExpired = untilTimeExpired( durationInMinutes, MINUTES ); BooleanSupplier notExpired = untilTimeExpired( durationInMinutes, MINUTES );
Expand All @@ -146,8 +126,7 @@ public void shouldBehaveCorrectlyUnderStress() throws Exception
Future<?> startStopWorker = service.submit( Future<?> startStopWorker = service.submit(
new StartStopLoad( fs, pageCache, keepGoing, onFailure, cluster, numberOfCores, numberOfEdges ) ); new StartStopLoad( fs, pageCache, keepGoing, onFailure, cluster, numberOfCores, numberOfEdges ) );
Future<?> backupWorker = service.submit( Future<?> backupWorker = service.submit(
new BackupLoad( keepGoing, onFailure, cluster, numberOfCores, numberOfEdges, backupDirectory, new BackupLoad( keepGoing, onFailure, cluster, numberOfCores, numberOfEdges, backupDirectory ) );
backupAddress ) );


Futures.combine(workload, startStopWorker, backupWorker).get( durationInMinutes + 5, MINUTES ); Futures.combine(workload, startStopWorker, backupWorker).get( durationInMinutes + 5, MINUTES );
} }
Expand Down
Expand Up @@ -20,15 +20,10 @@
package org.neo4j.causalclustering.stresstests; package org.neo4j.causalclustering.stresstests;


import java.util.Map; import java.util.Map;
import java.util.function.IntFunction;


import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;
import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.configuration.Settings;


import static org.neo4j.kernel.configuration.Settings.TRUE;

class ClusterConfiguration class ClusterConfiguration
{ {
private ClusterConfiguration() private ClusterConfiguration()
Expand All @@ -49,12 +44,4 @@ static Map<String,String> configureRaftLogRotationAndPruning( Map<String,String>
settings.put( CausalClusteringSettings.raft_log_pruning_strategy.name(), "keep_none" ); settings.put( CausalClusteringSettings.raft_log_pruning_strategy.name(), "keep_none" );
return settings; return settings;
} }

static Map<String,IntFunction<String>> configureBackup( Map<String,IntFunction<String>> settings,
IntFunction<SocketAddress> address )
{
settings.put( OnlineBackupSettings.online_backup_enabled.name(), id -> TRUE );
settings.put( OnlineBackupSettings.online_backup_server.name(), id -> address.apply( id ).toString() );
return settings;
}
} }
Expand Up @@ -44,7 +44,7 @@ abstract class RepeatUntilOnSelectedMemberCallable extends RepeatUntilCallable
} }


@Override @Override
protected final void doWork() protected final void doWork() throws Exception
{ {
boolean isCore = numberOfEdges == 0 || random.nextBoolean(); boolean isCore = numberOfEdges == 0 || random.nextBoolean();
Collection<? extends ClusterMember> members = isCore ? cluster.coreMembers() : cluster.readReplicas(); Collection<? extends ClusterMember> members = isCore ? cluster.coreMembers() : cluster.readReplicas();
Expand All @@ -53,5 +53,5 @@ protected final void doWork()
doWorkOnMember( isCore, id ); doWorkOnMember( isCore, id );
} }


protected abstract void doWorkOnMember( boolean isCore, int id ); protected abstract void doWorkOnMember( boolean isCore, int id ) throws Exception;
} }
Expand Up @@ -45,9 +45,9 @@ public final void run()
catch ( Throwable t ) catch ( Throwable t )
{ {
onFailure.run(); onFailure.run();
throw t; throw new RuntimeException( t );
} }
} }


protected abstract void doWork(); protected abstract void doWork() throws Exception;
} }

0 comments on commit c2d9596

Please sign in to comment.