Skip to content

Commit

Permalink
Enabling Backup from Edge Servers.
Browse files Browse the repository at this point in the history
Historically Edge servers could not back up because of cluster state (and their transient nature).
This commit enables backup from Edge servers giving operators more deployment choices.
  • Loading branch information
jimwebber committed Aug 31, 2016
1 parent 26bacc0 commit f392e90
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 40 deletions.
Expand Up @@ -118,14 +118,6 @@ public void registerProcedures( Procedures procedures )
final DiscoveryServiceFactory discoveryServiceFactory ) final DiscoveryServiceFactory discoveryServiceFactory )
{ {
LogService logging = platformModule.logging; LogService logging = platformModule.logging;
Log userLog = logging.getUserLog( EnterpriseEdgeEditionModule.class );
if ( platformModule.config.get( OnlineBackupSettings.online_backup_enabled ) )
{
userLog.warn( "Backup is not supported on edge servers. Ignoring the configuration setting: "
+ OnlineBackupSettings.online_backup_enabled );
platformModule.config.augment( singletonMap( OnlineBackupSettings.online_backup_enabled.name(), Settings
.FALSE ) );
}


ioLimiter = new ConfigurableIOLimiter( platformModule.config ); ioLimiter = new ConfigurableIOLimiter( platformModule.config );


Expand Down
Expand Up @@ -19,10 +19,6 @@
*/ */
package org.neo4j.coreedge.backup; package org.neo4j.coreedge.backup;


import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.PrintStream; import java.io.PrintStream;
Expand All @@ -35,32 +31,39 @@
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Stream; import java.util.stream.Stream;


import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.neo4j.backup.OnlineBackupSettings; import org.neo4j.backup.OnlineBackupSettings;
import org.neo4j.coreedge.TestStoreId; import org.neo4j.coreedge.TestStoreId;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.CoreGraphDatabase;
import org.neo4j.coreedge.discovery.Cluster; import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.discovery.CoreClusterMember; import org.neo4j.coreedge.discovery.CoreClusterMember;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.identity.StoreId; import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.core.CoreGraphDatabase;
import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.collection.MapUtil; import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0;
import org.neo4j.test.DbRepresentation; import org.neo4j.test.DbRepresentation;
import org.neo4j.test.coreedge.ClusterRule; import org.neo4j.test.coreedge.ClusterRule;
import org.neo4j.test.rule.SuppressOutput; import org.neo4j.test.rule.SuppressOutput;


import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;

import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;

import static org.neo4j.backup.BackupEmbeddedIT.runBackupToolFromOtherJvmToGetExitCode; import static org.neo4j.backup.BackupEmbeddedIT.runBackupToolFromOtherJvmToGetExitCode;
import static org.neo4j.coreedge.TestStoreId.assertAllStoresHaveTheSameStoreId; import static org.neo4j.coreedge.TestStoreId.assertAllStoresHaveTheSameStoreId;
import static org.neo4j.graphdb.Label.label;
import static org.neo4j.coreedge.backup.ArgsBuilder.args; import static org.neo4j.coreedge.backup.ArgsBuilder.args;
import static org.neo4j.coreedge.backup.ArgsBuilder.toArray; import static org.neo4j.coreedge.backup.ArgsBuilder.toArray;
import static org.neo4j.graphdb.Label.label;


public class BackupCoreIT public class BackupCoreIT
{ {
Expand All @@ -72,7 +75,7 @@ public class BackupCoreIT
.withNumberOfCoreMembers( 3 ) .withNumberOfCoreMembers( 3 )
.withNumberOfEdgeMembers( 0 ) .withNumberOfEdgeMembers( 0 )
.withSharedCoreParam( OnlineBackupSettings.online_backup_enabled, Settings.TRUE ) .withSharedCoreParam( OnlineBackupSettings.online_backup_enabled, Settings.TRUE )
.withInstanceCoreParam( OnlineBackupSettings.online_backup_server, serverId -> (":" + (8000 + serverId) ) ); .withInstanceCoreParam( OnlineBackupSettings.online_backup_server, serverId -> (":" + (8000 + serverId)) );
private Cluster cluster; private Cluster cluster;
private File backupPath; private File backupPath;
private DefaultFileSystemAbstraction fs = new DefaultFileSystemAbstraction(); private DefaultFileSystemAbstraction fs = new DefaultFileSystemAbstraction();
Expand All @@ -88,13 +91,13 @@ public void setup() throws Exception
public void makeSureBackupCanBePerformed() throws Throwable public void makeSureBackupCanBePerformed() throws Throwable
{ {
// Run backup // Run backup
CoreGraphDatabase db = createSomeData(cluster); CoreGraphDatabase db = createSomeData( cluster );
DbRepresentation beforeChange = DbRepresentation.of( db ); DbRepresentation beforeChange = DbRepresentation.of( db );
String[] args = backupArguments(backupAddress(db), backupPath.getPath() ); String[] args = backupArguments( backupAddress( db ), backupPath.getPath() );
assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) ); assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) );


// Add some new data // Add some new data
DbRepresentation afterChange = DbRepresentation.of( createSomeData(cluster) ); DbRepresentation afterChange = DbRepresentation.of( createSomeData( cluster ) );


// Verify that backed up database can be started and compare representation // Verify that backed up database can be started and compare representation
DbRepresentation backupRepresentation = DbRepresentation.of( backupPath, getConfig() ); DbRepresentation backupRepresentation = DbRepresentation.of( backupPath, getConfig() );
Expand All @@ -108,9 +111,9 @@ public void makeSureBackupCanBePerformedFromAnyInstance() throws Throwable
for ( CoreClusterMember db : cluster.coreMembers() ) for ( CoreClusterMember db : cluster.coreMembers() )
{ {
// Run backup // Run backup
DbRepresentation beforeChange = DbRepresentation.of(createSomeData( cluster )); DbRepresentation beforeChange = DbRepresentation.of( createSomeData( cluster ) );
File backupPathPerCoreMachine = new File( backupPath, "" + db.id().hashCode() ); File backupPathPerCoreMachine = new File( backupPath, "" + db.id().hashCode() );
String[] args = backupArguments(backupAddress(db.database()), backupPathPerCoreMachine.getPath() ); String[] args = backupArguments( backupAddress( db.database() ), backupPathPerCoreMachine.getPath() );
assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) ); assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) );


// Add some new data // Add some new data
Expand All @@ -129,7 +132,7 @@ public void makeSureCoreClusterCanBeRestoredFromABackup() throws Throwable
// given // given
CoreGraphDatabase db = createSomeData( cluster ); CoreGraphDatabase db = createSomeData( cluster );
DbRepresentation beforeBackup = DbRepresentation.of( db ); DbRepresentation beforeBackup = DbRepresentation.of( db );
String[] args = backupArguments(backupAddress(db), backupPath.getPath() ); String[] args = backupArguments( backupAddress( db ), backupPath.getPath() );
assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) ); assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) );


// when we shutdown the cluster we lose the number of core servers so we won't go through the for loop unless // when we shutdown the cluster we lose the number of core servers so we won't go through the for loop unless
Expand All @@ -146,27 +149,29 @@ public void makeSureCoreClusterCanBeRestoredFromABackup() throws Throwable
ByteArrayOutputStream output = new ByteArrayOutputStream(); ByteArrayOutputStream output = new ByteArrayOutputStream();
PrintStream sysOut = new PrintStream( output ); PrintStream sysOut = new PrintStream( output );


Path homeDir = Paths.get(cluster.getCoreMemberById( 0 ).homeDir().getPath()); Path homeDir = Paths.get( cluster.getCoreMemberById( 0 ).homeDir().getPath() );
new RestoreNewClusterCli( homeDir, homeDir, sysOut ).execute(toArray( args().from( backupPath ) new RestoreNewClusterCli( homeDir, homeDir, sysOut ).execute( toArray( args().from( backupPath )
.database( "graph.db" ).force().build() )); .database( "graph.db" ).force().build() ) );


String seed = RestoreClusterCliTest.extractSeed( output.toString() ); String seed = RestoreClusterCliTest.extractSeed( output.toString() );


for ( int i = 1; i < numberOfCoreMembers; i++ ) for ( int i = 1; i < numberOfCoreMembers; i++ )
{ {
homeDir = Paths.get(cluster.getCoreMemberById( i ).homeDir().getPath()); homeDir = Paths.get( cluster.getCoreMemberById( i ).homeDir().getPath() );
new RestoreExistingClusterCli( homeDir, homeDir ).execute( new RestoreExistingClusterCli( homeDir, homeDir ).execute(
toArray( args().from( backupPath ).database( "graph.db" ).seed( seed ).force().build() ) ); toArray( args().from( backupPath ).database( "graph.db" ).seed( seed ).force().build() ) );
} }


cluster.start(); cluster.start();


// then // then
Collection<CoreClusterMember> coreGraphDatabases = cluster.coreMembers(); Collection<CoreClusterMember> coreGraphDatabases = cluster.coreMembers();
Stream<DbRepresentation> dbRepresentations = coreGraphDatabases.stream().map( x -> DbRepresentation.of(x.database()) ); Stream<DbRepresentation> dbRepresentations = coreGraphDatabases.stream().map( x -> DbRepresentation.of( x
.database() ) );
dbRepresentations.forEach( afterReSeed -> assertEquals( beforeBackup, afterReSeed ) ); dbRepresentations.forEach( afterReSeed -> assertEquals( beforeBackup, afterReSeed ) );


List<File> afterRestoreDbPaths = coreGraphDatabases.stream().map( CoreClusterMember::storeDir ).collect( toList() ); List<File> afterRestoreDbPaths = coreGraphDatabases.stream().map( CoreClusterMember::storeDir ).collect(
toList() );
cluster.shutdown(); cluster.shutdown();


assertAllStoresHaveTheSameStoreId( afterRestoreDbPaths, fs ); assertAllStoresHaveTheSameStoreId( afterRestoreDbPaths, fs );
Expand All @@ -176,14 +181,16 @@ public void makeSureCoreClusterCanBeRestoredFromABackup() throws Throwable


static CoreGraphDatabase createSomeData( Cluster cluster ) throws TimeoutException, InterruptedException static CoreGraphDatabase createSomeData( Cluster cluster ) throws TimeoutException, InterruptedException
{ {
return cluster.coreTx( ( db, tx ) -> { return cluster.coreTx( ( db, tx ) ->
{
Node node = db.createNode( label( "boo" ) ); Node node = db.createNode( label( "boo" ) );
node.setProperty( "foobar", "baz_bat" ); node.setProperty( "foobar", "baz_bat" );
tx.success(); tx.success();
} ).database(); } ).database();
} }


static String backupAddress(CoreGraphDatabase db) { static String backupAddress( GraphDatabaseFacade db )
{
InetSocketAddress inetSocketAddress = db.getDependencyResolver() InetSocketAddress inetSocketAddress = db.getDependencyResolver()
.resolveDependency( Config.class ).get( CoreEdgeClusterSettings.transaction_advertised_address ) .resolveDependency( Config.class ).get( CoreEdgeClusterSettings.transaction_advertised_address )
.socketAddress(); .socketAddress();
Expand Down
Expand Up @@ -19,47 +19,103 @@
*/ */
package org.neo4j.coreedge.backup; package org.neo4j.coreedge.backup;


import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;

import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;


import java.io.File;

import org.neo4j.backup.OnlineBackupSettings; import org.neo4j.backup.OnlineBackupSettings;
import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.edge.EdgeGraphDatabase;
import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.configuration.Settings;
import org.neo4j.test.DbRepresentation;
import org.neo4j.test.coreedge.ClusterRule; import org.neo4j.test.coreedge.ClusterRule;
import org.neo4j.test.rule.SuppressOutput; import org.neo4j.test.rule.SuppressOutput;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import static org.neo4j.backup.BackupEmbeddedIT.runBackupToolFromOtherJvmToGetExitCode; import static org.neo4j.backup.BackupEmbeddedIT.runBackupToolFromOtherJvmToGetExitCode;
import static org.neo4j.coreedge.backup.BackupCoreIT.backupAddress;
import static org.neo4j.coreedge.backup.BackupCoreIT.backupArguments; import static org.neo4j.coreedge.backup.BackupCoreIT.backupArguments;
import static org.neo4j.coreedge.backup.BackupCoreIT.createSomeData;
import static org.neo4j.coreedge.backup.BackupCoreIT.getConfig;
import static org.neo4j.test.rule.SuppressOutput.suppress;


public class BackupEdgeIT public class BackupEdgeIT
{ {
@Rule @Rule
public SuppressOutput suppressOutput = SuppressOutput.suppress( SuppressOutput.System.out ); public SuppressOutput suppressOutput = suppress( SuppressOutput.System.out, SuppressOutput.System.err );


@Rule @Rule
public ClusterRule clusterRule = new ClusterRule( BackupCoreIT.class ) public ClusterRule clusterRule = new ClusterRule( BackupEdgeIT.class )
.withNumberOfCoreMembers( 3 ) .withNumberOfCoreMembers( 3 )
.withSharedCoreParam( OnlineBackupSettings.online_backup_enabled, Settings.FALSE ) .withSharedCoreParam( OnlineBackupSettings.online_backup_enabled, Settings.FALSE )
.withNumberOfEdgeMembers( 1 ) .withNumberOfEdgeMembers( 1 )
.withSharedEdgeParam( OnlineBackupSettings.online_backup_enabled, Settings.TRUE ) .withSharedEdgeParam( OnlineBackupSettings.online_backup_enabled, Settings.TRUE )
.withInstanceEdgeParam( OnlineBackupSettings.online_backup_server, serverId -> ":8000" ); .withInstanceEdgeParam( OnlineBackupSettings.online_backup_server, serverId -> ":" + findFreePort( 8000, 9000 ) );


private Cluster cluster;
private File backupPath; private File backupPath;


@Before @Before
public void setup() throws Exception public void setup() throws Exception
{ {
backupPath = clusterRule.testDirectory().cleanDirectory( "backup-db" ); backupPath = clusterRule.testDirectory().cleanDirectory( "backup-db" );
clusterRule.startCluster(); cluster = clusterRule.startCluster();
} }


@Test @Test
public void makeSureBackupCannotBePerformed() throws Throwable public void makeSureBackupCanBePerformed() throws Throwable
{ {
String[] args = backupArguments( "localhost:8000", backupPath.getPath() ); // Run backup
assertEquals( 1, runBackupToolFromOtherJvmToGetExitCode( args ) ); createSomeData( cluster );

EdgeGraphDatabase db = cluster.findAnEdgeMember().database();

DbRepresentation beforeChange = DbRepresentation.of( db );
String backupAddress = backupAddress( db );
System.out.println( backupAddress );
String[] args = backupArguments( backupAddress, backupPath.getPath() );
assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) );

// Add some new data
DbRepresentation afterChange = DbRepresentation.of( createSomeData( cluster ) );

// Verify that backed up database can be started and compare representation
DbRepresentation backupRepresentation = DbRepresentation.of( backupPath, getConfig() );
assertEquals( beforeChange, backupRepresentation );
assertNotEquals( backupRepresentation, afterChange );
}

private static int findFreePort( int startRange, int endRange )
{
InetSocketAddress address = null;
RuntimeException ex = null;
for ( int port = startRange; port <= endRange; port++ )
{
address = new InetSocketAddress( port );

try
{
new ServerSocket( address.getPort(), 100, address.getAddress() ).close();
ex = null;
break;
}
catch ( IOException e )
{
ex = new RuntimeException( e );
}
}
if ( ex != null )
{
throw ex;
}
assert address != null;
return address.getPort();
} }
} }

0 comments on commit f392e90

Please sign in to comment.