diff --git a/community/common/src/main/java/org/neo4j/function/ThrowingConsumer.java b/community/common/src/main/java/org/neo4j/function/ThrowingConsumer.java index 26cfff2b6b046..2c74c472d17f0 100644 --- a/community/common/src/main/java/org/neo4j/function/ThrowingConsumer.java +++ b/community/common/src/main/java/org/neo4j/function/ThrowingConsumer.java @@ -26,7 +26,7 @@ * @param the type of the input to the operation * @param the type of exception that may be thrown from the function */ -public interface ThrowingConsumer +public interface ThrowingConsumer { /** * Performs this operation on the given argument. diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/MetaDataStore.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/MetaDataStore.java index eb9a5568cc0fb..5598d13315aba 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/MetaDataStore.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/MetaDataStore.java @@ -49,6 +49,7 @@ import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; + import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK; import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_WRITE_LOCK; import static org.neo4j.kernel.impl.store.format.standard.MetaDataRecordFormat.FIELD_NOT_PRESENT; @@ -316,6 +317,17 @@ public StoreId getStoreId() return new StoreId( getCreationTime(), getRandomNumber(), getStoreVersion(), getUpgradeTime(), upgradeTxIdField ); } + public static StoreId getStoreId( PageCache pageCache, File neoStore ) throws IOException + { + return new StoreId( + getRecord( pageCache, neoStore, Position.TIME ), + getRecord( pageCache, neoStore, Position.RANDOM_NUMBER ), + getRecord( pageCache, neoStore, Position.STORE_VERSION ), + getRecord( pageCache, neoStore, Position.UPGRADE_TIME ), + getRecord( pageCache, neoStore, Position.UPGRADE_TRANSACTION_ID ) + ); + } + public long getUpgradeTime() { assertNotClosed(); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/ReplicationModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/ReplicationModule.java index d811f296e303e..6b32adf5af000 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/ReplicationModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/ReplicationModule.java @@ -45,7 +45,6 @@ public class ReplicationModule { - public static final String LAST_FLUSHED_NAME = "last-flushed"; public static final String SESSION_TRACKER_NAME = "session-tracker"; private final RaftReplicator replicator; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/SessionTracker.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/SessionTracker.java index 6483797557c81..e81ebe6a6f345 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/SessionTracker.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/SessionTracker.java @@ -21,17 +21,15 @@ import java.io.IOException; -import org.neo4j.coreedge.core.state.snapshot.CoreStateType; import org.neo4j.coreedge.core.replication.session.GlobalSession; import org.neo4j.coreedge.core.replication.session.GlobalSessionTrackerState; import org.neo4j.coreedge.core.replication.session.LocalOperationId; -import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; import org.neo4j.coreedge.core.state.storage.StateStorage; public class SessionTracker { private final StateStorage sessionTrackerStorage; - private GlobalSessionTrackerState sessionState = new GlobalSessionTrackerState(); + private GlobalSessionTrackerState sessionState = null; public SessionTracker( StateStorage sessionTrackerStorage ) { @@ -40,7 +38,10 @@ public SessionTracker( StateStorage sessionTrackerSto public void start() { - sessionState = sessionTrackerStorage.getInitialState(); + if ( sessionState == null ) + { + sessionState = sessionTrackerStorage.getInitialState(); + } } public long getLastAppliedIndex() diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/backup/RestoreExistingClusterCli.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/backup/RestoreExistingClusterCli.java deleted file mode 100644 index dd0d1f281ce9d..0000000000000 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/backup/RestoreExistingClusterCli.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.backup; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import org.neo4j.commandline.admin.AdminCommand; -import org.neo4j.commandline.admin.CommandFailed; -import org.neo4j.commandline.admin.IncorrectUsage; -import org.neo4j.commandline.admin.OutsideWorld; -import org.neo4j.coreedge.convert.ConversionVerifier; -import org.neo4j.coreedge.convert.ConvertClassicStoreToCoreCommand; -import org.neo4j.dbms.DatabaseManagementSystemSettings; -import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.helpers.Args; -import org.neo4j.io.fs.DefaultFileSystemAbstraction; -import org.neo4j.kernel.api.exceptions.TransactionFailureException; -import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.impl.util.Converters; -import org.neo4j.restore.RestoreDatabaseCommand; -import org.neo4j.server.configuration.ConfigLoader; - -import static org.neo4j.dbms.DatabaseManagementSystemSettings.database_path; -import static org.neo4j.graphdb.factory.GraphDatabaseSettings.record_format; -import static org.neo4j.helpers.collection.MapUtil.stringMap; - -public class RestoreExistingClusterCli implements AdminCommand -{ - public static class Provider extends AdminCommand.Provider - { - public Provider() - { - super( "restore-existing-cluster" ); - } - - @Override - public Optional arguments() - { - return Optional.of( "--from= --database= --cluster-seed= " + - "[--force]" ); - } - - @Override - public String description() - { - return "Restores a database backed up using the neo4j-backup tool to be used as an instance of a " + - "new cluster. Takes the seed output from `restore-new-cluster` as an argument. "; - } - - @Override - public AdminCommand create( Path homeDir, Path configDir, OutsideWorld outsideWorld ) - { - return new RestoreExistingClusterCli( homeDir, configDir ); - } - } - - private final Path homeDir; - private final Path configDir; - - public RestoreExistingClusterCli( Path homeDir, Path configDir ) - { - this.homeDir = homeDir; - this.configDir = configDir; - } - - @Override - public void execute( String[] incomingArguments ) throws IncorrectUsage, CommandFailed - { - String databaseName; - String fromPath; - String clusterSeed; - boolean forceOverwrite; - Args args = Args.parse( incomingArguments ); - try - { - databaseName = args.interpretOption( "database", Converters.mandatory(), s -> s ); - fromPath = args.interpretOption( "from", Converters.mandatory(), s -> s ); - clusterSeed = args.interpretOption( "cluster-seed", Converters.mandatory(), s -> s ); - forceOverwrite = args.getBoolean( "force", Boolean.FALSE, true ); - } - catch ( IllegalArgumentException e ) - { - throw new IncorrectUsage( e.getMessage() ); - } - try - { - Config config = loadNeo4jConfig( homeDir, configDir, databaseName ); - restoreDatabase( databaseName, fromPath, forceOverwrite, config ); - convertStore( config, clusterSeed ); - } - catch ( IOException | TransactionFailureException e ) - { - throw new RuntimeException( e ); - } - } - - private static Config loadNeo4jConfig( Path homeDir, Path configDir, String databaseName ) - { - ConfigLoader configLoader = new ConfigLoader( settings() ); - Config config = configLoader.loadConfig( Optional.of( homeDir.toFile() ), - Optional.of( configDir.resolve( "neo4j.conf" ).toFile() ) ); - - return config.with( stringMap( DatabaseManagementSystemSettings.active_database.name(), databaseName ) ); - } - - private static void convertStore( Config config, String seed ) throws IOException, TransactionFailureException - { - ConvertClassicStoreToCoreCommand convert = new ConvertClassicStoreToCoreCommand( new ConversionVerifier() ); - convert.convert( config.get( database_path ), config.get( record_format ), seed ); - } - - private static void restoreDatabase( String databaseName, String fromPath, boolean forceOverwrite, Config config ) - throws IOException - { - new RestoreDatabaseCommand( new DefaultFileSystemAbstraction(), new File( fromPath ), config, databaseName, - forceOverwrite ).execute(); - } - - public static List> settings() - { - List> settings = new ArrayList<>(); - settings.add( GraphDatabaseSettings.class ); - settings.add( DatabaseManagementSystemSettings.class ); - return settings; - } -} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/backup/RestoreNewClusterCli.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/backup/RestoreNewClusterCli.java deleted file mode 100644 index fa3eca6cefdea..0000000000000 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/backup/RestoreNewClusterCli.java +++ /dev/null @@ -1,165 +0,0 @@ - -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.backup; - -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import org.neo4j.commandline.admin.AdminCommand; -import org.neo4j.commandline.admin.CommandFailed; -import org.neo4j.commandline.admin.IncorrectUsage; -import org.neo4j.commandline.admin.OutsideWorld; -import org.neo4j.coreedge.convert.ConversionVerifier; -import org.neo4j.coreedge.convert.ConvertClassicStoreToCoreCommand; -import org.neo4j.coreedge.convert.GenerateClusterSeedCommand; -import org.neo4j.dbms.DatabaseManagementSystemSettings; -import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.helpers.Args; -import org.neo4j.io.fs.DefaultFileSystemAbstraction; -import org.neo4j.kernel.api.exceptions.TransactionFailureException; -import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.impl.util.Converters; -import org.neo4j.restore.RestoreDatabaseCommand; -import org.neo4j.server.configuration.ConfigLoader; - -import static org.neo4j.dbms.DatabaseManagementSystemSettings.database_path; -import static org.neo4j.graphdb.factory.GraphDatabaseSettings.record_format; -import static org.neo4j.helpers.collection.MapUtil.stringMap; - -public class RestoreNewClusterCli implements AdminCommand -{ - public static class Provider extends AdminCommand.Provider - { - public Provider() - { - super( "restore-new-cluster" ); - } - - @Override - public Optional arguments() - { - return Optional.of( "--from= --database= [--force]" ); - } - - @Override - public String description() - { - return "Restores a database backed up using the neo4j-backup tool to be used as the first instance of a " + - "new cluster."; - } - - @Override - public AdminCommand create( Path homeDir, Path configDir, OutsideWorld outsideWorld ) - { - return new RestoreNewClusterCli( homeDir, configDir ); - } - } - - private final Path homeDir; - private final Path configDir; - private final PrintStream out; - - public RestoreNewClusterCli( Path homeDir, Path configDir, PrintStream out ) - { - this.homeDir = homeDir; - this.configDir = configDir; - this.out = out; - } - - public RestoreNewClusterCli(Path homeDir, Path configDir) - { - this( homeDir, configDir, System.out ); - } - - @Override - public void execute( String[] incomingArguments ) throws IncorrectUsage, CommandFailed - { - String databaseName; - String fromPath; - boolean forceOverwrite; - - Args args = Args.parse( incomingArguments ); - try - { - databaseName = args.interpretOption( "database", Converters.mandatory(), s -> s ); - fromPath = args.interpretOption( "from", Converters.mandatory(), s -> s ); - forceOverwrite = args.getBoolean( "force", Boolean.FALSE, true ); - } - catch ( IllegalArgumentException e ) - { - throw new IncorrectUsage( e.getMessage() ); - } - - try - { - Config config = loadNeo4jConfig( homeDir, configDir, databaseName ); - restoreDatabase( databaseName, fromPath, forceOverwrite, config ); - String seed = generateSeed( config ); - convertStore( config, seed ); - out.println( "Cluster Seed: " + seed ); - } - catch ( IOException | TransactionFailureException e ) - { - throw new RuntimeException( e ); - } - } - - private static Config loadNeo4jConfig( Path homeDir, Path configDir, String databaseName ) - { - ConfigLoader configLoader = new ConfigLoader( settings() ); - Config config = configLoader.loadConfig( - Optional.of( homeDir.toFile() ), - Optional.of( configDir.resolve( "neo4j.conf" ).toFile() )); - - return config.with( stringMap( DatabaseManagementSystemSettings.active_database.name(), databaseName ) ); - } - - private static void convertStore( Config config, String seed ) throws IOException, TransactionFailureException - { - ConvertClassicStoreToCoreCommand convert = new ConvertClassicStoreToCoreCommand( new ConversionVerifier() ); - convert.convert( config.get( database_path ), config.get( record_format ), seed ); - } - - private static String generateSeed( Config config ) throws IOException - { - return new GenerateClusterSeedCommand().generate( config.get( database_path ) ).getConversionId(); - } - - private static void restoreDatabase( String databaseName, String fromPath, boolean forceOverwrite, Config config ) - throws IOException - { - new RestoreDatabaseCommand( new DefaultFileSystemAbstraction(), - new File( fromPath ), config, databaseName, forceOverwrite ).execute(); - } - - private static List> settings() - { - List> settings = new ArrayList<>(); - settings.add( GraphDatabaseSettings.class ); - settings.add( DatabaseManagementSystemSettings.class ); - return settings; - } -} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabase.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabase.java index 86cb1538fd528..0f78fa7e54374 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabase.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabase.java @@ -24,12 +24,15 @@ import java.util.function.Supplier; import org.neo4j.coreedge.identity.StoreId; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.impl.store.MetaDataStore; +import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.state.DataSourceManager; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.lifecycle.Lifecycle; -import org.neo4j.logging.Log; -import org.neo4j.logging.LogProvider; + +import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; public class LocalDatabase implements Supplier, Lifecycle { @@ -37,26 +40,23 @@ public class LocalDatabase implements Supplier, Lifecycle private final StoreFiles storeFiles; private final DataSourceManager dataSourceManager; - private final Supplier transactionIdStoreSupplier; + private final PageCache pageCache; private final Supplier databaseHealthSupplier; - private final Log log; private volatile StoreId storeId; private volatile DatabaseHealth databaseHealth; + private boolean started = false; public LocalDatabase( File storeDir, StoreFiles storeFiles, DataSourceManager dataSourceManager, - Supplier transactionIdStoreSupplier, - Supplier databaseHealthSupplier, - LogProvider logProvider ) + PageCache pageCache, + Supplier databaseHealthSupplier ) { this.storeDir = storeDir; this.storeFiles = storeFiles; this.dataSourceManager = dataSourceManager; - this.transactionIdStoreSupplier = transactionIdStoreSupplier; + this.pageCache = pageCache; this.databaseHealthSupplier = databaseHealthSupplier; - this.log = logProvider.getLog( getClass() ); - this.storeId = null; } @Override @@ -66,21 +66,19 @@ public void init() throws Throwable } @Override - public void start() throws Throwable + public synchronized void start() throws Throwable { + storeId = readStoreIdFromDisk(); dataSourceManager.start(); - org.neo4j.kernel.impl.store.StoreId kernelStoreId = dataSourceManager.getDataSource().getStoreId(); - storeId = new StoreId( kernelStoreId.getCreationTime(), kernelStoreId.getRandomId(), - kernelStoreId.getUpgradeTime(), kernelStoreId.getUpgradeId() ); - log.info( "My StoreId is: " + storeId ); + started = true; } @Override - public void stop() throws Throwable + public synchronized void stop() throws Throwable { - this.storeId = null; this.databaseHealth = null; dataSourceManager.stop(); + started = false; } @Override @@ -89,9 +87,31 @@ public void shutdown() throws Throwable dataSourceManager.shutdown(); } - public StoreId storeId() + public synchronized StoreId storeId() + { + if ( started ) + { + return storeId; + } + else + { + return readStoreIdFromDisk(); + } + } + + private StoreId readStoreIdFromDisk() { - return storeId; + try + { + File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME ); + org.neo4j.kernel.impl.store.StoreId kernelStoreId = MetaDataStore.getStoreId( pageCache, neoStoreFile ); + return new StoreId( kernelStoreId.getCreationTime(), kernelStoreId.getRandomId(), + kernelStoreId.getUpgradeTime(), kernelStoreId.getUpgradeId() ); + } + catch ( IOException e ) + { + return null; + } } public void panic( Throwable cause ) @@ -118,10 +138,11 @@ public void delete() throws IOException storeFiles.delete( storeDir ); } - public boolean isEmpty() + public boolean isEmpty() throws IOException { // TODO: Below doesn't work for an imported store. Need to check high-ids as well. - return transactionIdStoreSupplier.get().getLastCommittedTransactionId() == TransactionIdStore.BASE_TX_ID; + ReadOnlyTransactionIdStore readOnlyTransactionIdStore = new ReadOnlyTransactionIdStore( pageCache, storeDir ); + return readOnlyTransactionIdStore.getLastCommittedTransactionId() <= BASE_TX_ID; } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyClient.java index 4c37a116a0d9a..8b55d6f801a7e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyClient.java @@ -102,7 +102,6 @@ public void onGetStoreIdResponse( CompletableFuture signal, } catch ( CatchUpClientException | NoKnownAddressesException e ) { - e.printStackTrace(); throw new StoreIdDownloadFailedException( e ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcher.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcher.java index af06696ad4024..fccb07c64a9ca 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcher.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcher.java @@ -109,7 +109,7 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir ) th public StoreId getStoreIdOf( MemberId from ) throws StoreIdDownloadFailedException { - String operation = "get store id"; + String operation = "get store id from " + from; long retryInterval = 5_000; int attempts = 0; @@ -139,6 +139,6 @@ public StoreId getStoreIdOf( MemberId from ) throws StoreIdDownloadFailedExcepti } } - throw new StoreIdDownloadFailedException( "Failed to get store id after " + (attempts - 1) + " attempts" ); + throw new StoreIdDownloadFailedException( "Failed to " + operation + " after " + (attempts - 1) + " attempts" ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/convert/ConvertNonCoreEdgeStoreCli.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/convert/ConvertNonCoreEdgeStoreCli.java deleted file mode 100644 index 69dc76de87f4a..0000000000000 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/convert/ConvertNonCoreEdgeStoreCli.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.convert; - -import java.io.File; -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import org.neo4j.dbms.DatabaseManagementSystemSettings; -import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.helpers.Args; -import org.neo4j.helpers.ArrayUtil; -import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.impl.util.Converters; -import org.neo4j.server.configuration.ConfigLoader; - -import static org.neo4j.helpers.collection.MapUtil.stringMap; - -public class ConvertNonCoreEdgeStoreCli -{ - public static void main( String[] incomingArguments ) throws Throwable - { - Args args = Args.parse( incomingArguments ); - if ( ArrayUtil.isEmpty( incomingArguments ) ) - { - printUsage( System.out ); - System.exit( 1 ); - } - - File homeDir = args.interpretOption( "home-dir", Converters.mandatory(), File::new ); - String databaseName = args.interpretOption( "database", Converters.mandatory(), s -> s ); - String configPath = args.interpretOption( "config", Converters.mandatory(), s -> s ); - String clusterSeed = args.interpretOption( "cluster-seed", Converters.mandatory(), s -> s ); - - Config config = createConfig( homeDir, databaseName, configPath ); - - new ConvertClassicStoreToCoreCommand( new ConversionVerifier() ).convert( - config.get( DatabaseManagementSystemSettings.database_path ), - config.get( GraphDatabaseSettings.record_format ), - clusterSeed ); - } - - private static Config createConfig( File homeDir, String databaseName, String configPath ) - { - return new ConfigLoader( settings() ).loadConfig( Optional.of( homeDir ), - Optional.of( new File( configPath, "neo4j.conf" ) ) ) - .with( stringMap( DatabaseManagementSystemSettings.active_database.name(), databaseName ) ); - } - - private static List> settings() - { - List> settings = new ArrayList<>(); - settings.add( GraphDatabaseSettings.class ); - settings.add( DatabaseManagementSystemSettings.class ); - return settings; - } - - private static void printUsage( PrintStream out ) - { - out.println( "Neo4j Classic to Core Format Conversion Tool" ); - for ( String line : Args.splitLongLine( "The classic to core conversion tool is used to convert a classic" - + "Neo4j store into one which has a core friendly format.", 80 ) ) - { - out.println( "\t" + line ); - } - - out.println( "Usage:" ); - out.println( "--home-dir " ); - out.println( "--database " ); - out.println( "--config " ); - out.println( "--cluster-seed " ); - } -} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/EnterpriseCoreEditionModule.java index 57d299fe7504d..3a98a9fb8219b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/EnterpriseCoreEditionModule.java @@ -77,7 +77,6 @@ import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.proc.Procedures; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; -import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.DefaultKernelData; @@ -146,14 +145,13 @@ protected Log authManagerLog() LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, new StoreFiles( new DefaultFileSystemAbstraction() ), platformModule.dataSourceManager, - platformModule.dependencies.provideDependency( TransactionIdStore.class ), databaseHealthSupplier, - logProvider ); - - life.add( localDatabase ); + platformModule.pageCache, + databaseHealthSupplier ); IdentityModule identityModule = new IdentityModule( platformModule, clusterStateDirectory ); - ClusteringModule clusteringModule = new ClusteringModule( discoveryServiceFactory, identityModule.myself(), platformModule, clusterStateDirectory ); + ClusteringModule clusteringModule = new ClusteringModule( discoveryServiceFactory, identityModule.myself(), + platformModule ); topologyService = clusteringModule.topologyService(); long logThresholdMillis = config.get( CoreEdgeClusterSettings.unknown_address_logging_throttle ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/IdentityModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/IdentityModule.java index 2292c97aa912c..2596f14883ea0 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/IdentityModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/IdentityModule.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.UUID; +import org.neo4j.coreedge.core.state.storage.SimpleFileStorage; import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.identity.MemberId; import org.neo4j.io.fs.FileSystemAbstraction; @@ -42,7 +43,7 @@ public class IdentityModule Log log = logProvider.getLog( getClass() ); - SimpleStorage memberIdStorage = new SimpleStorage<>( fileSystem, clusterStateDirectory, + SimpleStorage memberIdStorage = new SimpleFileStorage<>( fileSystem, clusterStateDirectory, CORE_MEMBER_ID_NAME, new MemberId.Marshal(), logProvider ); try diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipEntry.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipEntry.java index 081996323c84d..fb5eba5efb084 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipEntry.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipEntry.java @@ -38,7 +38,7 @@ public class MembershipEntry private long logIndex; private Set members; - MembershipEntry( long logIndex, Set members ) + public MembershipEntry( long logIndex, Set members ) { this.members = members; this.logIndex = logIndex; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/replication/session/GlobalSessionTrackerState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/replication/session/GlobalSessionTrackerState.java index b8b048f2942be..64393fb6133f2 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/replication/session/GlobalSessionTrackerState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/replication/session/GlobalSessionTrackerState.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.UUID; import org.neo4j.coreedge.messaging.marshalling.ChannelMarshal; @@ -99,6 +100,7 @@ private LocalSessionTracker validateGlobalSessionAndGetLocalSessionTracker( Glob public GlobalSessionTrackerState newInstance() { GlobalSessionTrackerState copy = new GlobalSessionTrackerState(); + copy.logIndex = logIndex; for ( Map.Entry entry : sessionTrackers.entrySet() ) { copy.sessionTrackers.put( entry.getKey(), entry.getValue().newInstance() ); @@ -106,6 +108,28 @@ public GlobalSessionTrackerState newInstance() return copy; } + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + GlobalSessionTrackerState that = (GlobalSessionTrackerState) o; + return logIndex == that.logIndex && + Objects.equals( sessionTrackers, that.sessionTrackers ); + } + + @Override + public int hashCode() + { + return Objects.hash( sessionTrackers, logIndex ); + } + @Override public String toString() { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/CoreServerModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/CoreServerModule.java index fec6d408dbadd..91e8874747597 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/CoreServerModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/CoreServerModule.java @@ -41,15 +41,20 @@ import org.neo4j.coreedge.core.consensus.log.pruning.PruningScheduler; import org.neo4j.coreedge.core.consensus.membership.MembershipWaiter; import org.neo4j.coreedge.core.consensus.membership.MembershipWaiterLifecycle; +import org.neo4j.coreedge.core.state.BindingService; import org.neo4j.coreedge.core.state.CommandApplicationProcess; +import org.neo4j.coreedge.core.state.CoreBootstrapper; import org.neo4j.coreedge.core.state.CoreState; import org.neo4j.coreedge.core.state.CoreStateApplier; import org.neo4j.coreedge.core.state.LongIndexMarshal; import org.neo4j.coreedge.core.state.machines.CoreStateMachinesModule; import org.neo4j.coreedge.core.state.snapshot.CoreStateDownloader; import org.neo4j.coreedge.core.state.storage.DurableStateStorage; +import org.neo4j.coreedge.core.state.storage.SimpleFileStorage; +import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.core.state.storage.StateStorage; import org.neo4j.coreedge.discovery.CoreTopologyService; +import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; import org.neo4j.coreedge.logging.MessageLogger; import org.neo4j.coreedge.messaging.CoreReplicatedContentMarshal; @@ -68,10 +73,15 @@ import org.neo4j.logging.LogProvider; import org.neo4j.time.Clocks; +import static java.lang.Thread.sleep; + import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.NEW_THREAD; public class CoreServerModule { + private static final String CLUSTER_ID_NAME = "cluster-id"; + public static final String LAST_FLUSHED_NAME = "last-flushed"; + public final MembershipWaiterLifecycle membershipWaiterLifecycle; public CoreServerModule( MemberId myself, final PlatformModule platformModule, ConsensusModule consensusModule, @@ -94,7 +104,7 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C StateStorage lastFlushedStorage; lastFlushedStorage = life.add( - new DurableStateStorage<>( fileSystem, clusterStateDirectory, ReplicationModule.LAST_FLUSHED_NAME, + new DurableStateStorage<>( fileSystem, clusterStateDirectory, LAST_FLUSHED_NAME, new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ), logProvider ) ); @@ -120,10 +130,20 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, storeFetcher, catchUpClient, logProvider, copiedStoreRecovery ); + SimpleStorage clusterIdStorage = new SimpleFileStorage<>( fileSystem, clusterStateDirectory, + CLUSTER_ID_NAME, new ClusterId.Marshal(), logProvider ); + + CoreBootstrapper coreBootstrapper = new CoreBootstrapper( platformModule.storeDir, platformModule.pageCache, + fileSystem, config ); + + BindingService bindingService = new BindingService( clusterIdStorage, discoveryService, logProvider, + Clocks.systemClock(), () -> sleep( 100 ), 300_000, coreBootstrapper ); + CoreState coreState = new CoreState( consensusModule.raftMachine(), localDatabase, logProvider, downloader, + bindingService, new CommandApplicationProcess( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(), config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), @@ -159,10 +179,10 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C new DataSourceSupplier( platformModule ), new CheckpointerSupplier( platformModule.dependencies ), coreState, config, platformModule.monitors ); - life.add( coreState ); + life.add( raftServer ); life.add( new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ), batchingMessageHandler, logProvider ) ); - life.add( raftServer ); + life.add( coreState ); life.add( catchupServer ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingProcess.java deleted file mode 100644 index f4760f60922cd..0000000000000 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingProcess.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.core.state; - -import java.util.UUID; -import java.util.concurrent.TimeoutException; - -import org.neo4j.coreedge.discovery.CoreTopology; -import org.neo4j.coreedge.identity.ClusterId; -import org.neo4j.logging.Log; - -class BindingProcess -{ - private final ClusterId localClusterId; - private final Log log; - - BindingProcess( ClusterId localClusterId, Log log ) - { - this.localClusterId = localClusterId; - this.log = log; - } - - ClusterId attempt( CoreTopology topology ) throws InterruptedException, TimeoutException, BindingException - { - ClusterId commonClusterId = topology.clusterId(); - - if ( commonClusterId != null ) - { - if ( localClusterId == null ) - { - log.info( "Discovered cluster id: " + commonClusterId ); - } - else if ( commonClusterId.equals( localClusterId ) ) - { - log.info( "Found matching cluster id: " + commonClusterId ); - } - else - { - throw new BindingException( String.format( "Cluster mismatch. Is the configuration correct? " + - "Expected: %s Discovered: %s", localClusterId, commonClusterId ) ); - } - } - else if ( localClusterId != null ) - { - commonClusterId = localClusterId; - log.info( "No common cluster id found, using local: " + commonClusterId ); - } - else if ( topology.canBeBootstrapped() ) - { - commonClusterId = new ClusterId( UUID.randomUUID() ); - log.info( "Creating new cluster id: " + commonClusterId ); - } - - return commonClusterId; - } -} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingService.java index a786ce9806e00..3b6c6ebf5c59c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingService.java @@ -21,53 +21,42 @@ import java.io.IOException; import java.time.Clock; -import java.util.Objects; +import java.util.UUID; import java.util.concurrent.TimeoutException; +import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.discovery.CoreTopology; import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.function.ThrowingAction; -import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.function.ThrowingConsumer; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -class BindingService extends LifecycleAdapter +public class BindingService { private final SimpleStorage clusterIdStorage; private final CoreTopologyService topologyService; + private final CoreBootstrapper coreBootstrapper; private final Log log; private final Clock clock; private final ThrowingAction retryWaiter; private final long timeoutMillis; - private ClusterId boundClusterId; - - BindingService( SimpleStorage clusterIdStorage, CoreTopologyService topologyService, LogProvider logProvider, Clock clock, ThrowingAction retryWaiter, long timeoutMillis ) + public BindingService( SimpleStorage clusterIdStorage, CoreTopologyService topologyService, + LogProvider logProvider, Clock clock, ThrowingAction retryWaiter, + long timeoutMillis, CoreBootstrapper coreBootstrapper ) { this.clusterIdStorage = clusterIdStorage; this.topologyService = topologyService; + this.coreBootstrapper = coreBootstrapper; this.log = logProvider.getLog( getClass() ); this.clock = clock; this.retryWaiter = retryWaiter; this.timeoutMillis = timeoutMillis; } - @Override - public void start() throws Throwable - { - boundClusterId = bindToCluster(); - } - - /** - * Returns the bound cluster ID. If the binding service fails to bind, then it will fail to start. - */ - ClusterId clusterId() - { - return Objects.requireNonNull( boundClusterId, "You must not ask for the cluster ID before the binding service has been started." ); - } - /** * The cluster binding process tries to establish a common cluster ID. If there is no common cluster ID * then a single instance will eventually create one and publish it through the underlying topology service. @@ -77,44 +66,64 @@ ClusterId clusterId() * @throws InterruptedException If the process gets interrupted. * @throws TimeoutException If the process times out. */ - private ClusterId bindToCluster() throws IOException, InterruptedException, TimeoutException, BindingException + ClusterId bindToCluster( ThrowingConsumer snapshotInstaller ) throws Throwable { - ClusterId localClusterId = clusterIdStorage.exists() ? clusterIdStorage.readState() : null; - BindingProcess binder = new BindingProcess( localClusterId, log ); - - long endTime = clock.millis() + timeoutMillis; - - CoreTopology topology = topologyService.coreServers(); - ClusterId commonClusterId; - - while ( (commonClusterId = binder.attempt( topology )) == null ) + if ( clusterIdStorage.exists() ) { - if ( clock.millis() < endTime ) + ClusterId localClusterId = clusterIdStorage.readState(); + publishClusterId( localClusterId ); + return localClusterId; + } + else + { + ClusterId commonClusterId; + CoreTopology topology = topologyService.coreServers(); + if ( topology.canBeBootstrapped() ) { - retryWaiter.apply(); - topology = topologyService.coreServers(); + commonClusterId = new ClusterId( UUID.randomUUID() ); + CoreSnapshot snapshot = coreBootstrapper.bootstrap( topology.members() ); + + snapshotInstaller.accept( snapshot ); + publishClusterId( commonClusterId ); } else { - throw new TimeoutException( "Failed binding to cluster in time. Last topology was: " + topology ); + long endTime = clock.millis() + timeoutMillis; + + log.info( "Attempting to bind to : " + topology ); + while ( (commonClusterId = topology.clusterId()) == null ) + { + if ( clock.millis() < endTime ) + { + retryWaiter.apply(); + topology = topologyService.coreServers(); + } + else + { + throw new TimeoutException( "Failed binding to cluster in time. Last topology was: " + + topology ); + } + } + + log.info( "Bound to cluster: " + commonClusterId ); } - } - if ( localClusterId == null ) - { clusterIdStorage.writeState( commonClusterId ); + + return commonClusterId; } + } - boolean success = topologyService.casClusterId( commonClusterId ); + private void publishClusterId( ClusterId localClusterId ) throws BindingException + { + boolean success = topologyService.casClusterId( localClusterId ); if ( !success ) { - throw new BindingException( "Failed to publish: " + commonClusterId ); + throw new BindingException( "Failed to publish: " + localClusterId ); } else { - log.info( "Published: " + commonClusterId ); + log.info( "Published: " + localClusterId ); } - - return commonClusterId; } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java index c06191049fbb4..40efa93f7ca3c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java @@ -19,47 +19,32 @@ */ package org.neo4j.coreedge.core.state; -import java.io.File; - -import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.discovery.DiscoveryServiceFactory; -import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.factory.PlatformModule; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.logging.LogProvider; -import org.neo4j.time.Clocks; - -import static java.lang.Thread.sleep; public class ClusteringModule { - private static final String CLUSTER_ID_NAME = "cluster-id"; private final CoreTopologyService topologyService; - public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, MemberId myself, PlatformModule platformModule, File clusterStateDirectory ) + public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, MemberId myself, + PlatformModule platformModule ) { LifeSupport life = platformModule.life; Config config = platformModule.config; LogProvider logProvider = platformModule.logging.getInternalLogProvider(); LogProvider userLogProvider = platformModule.logging.getUserLogProvider(); Dependencies dependencies = platformModule.dependencies; - FileSystemAbstraction fileSystem = platformModule.fileSystem; - - SimpleStorage clusterIdStorage = new SimpleStorage<>( fileSystem, clusterStateDirectory, - CLUSTER_ID_NAME, new ClusterId.Marshal(), logProvider ); topologyService = discoveryServiceFactory.coreTopologyService( config, myself, logProvider, userLogProvider ); - BindingService bindingService = new BindingService( clusterIdStorage, topologyService, logProvider, - Clocks.systemClock(), () -> sleep( 100 ), 300_000 ); life.add( topologyService ); - life.add( bindingService ); dependencies.satisfyDependency( topologyService ); // for tests } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java index 63fd77718ef07..ad21686661590 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java @@ -26,17 +26,17 @@ import org.neo4j.coreedge.SessionTracker; import org.neo4j.coreedge.core.consensus.RaftMachine; +import org.neo4j.coreedge.core.consensus.log.RaftLog; +import org.neo4j.coreedge.core.consensus.log.RaftLogEntry; +import org.neo4j.coreedge.core.consensus.log.monitoring.RaftLogCommitIndexMonitor; +import org.neo4j.coreedge.core.consensus.log.segmented.InFlightMap; import org.neo4j.coreedge.core.replication.DistributedOperation; import org.neo4j.coreedge.core.replication.ProgressTracker; import org.neo4j.coreedge.core.state.machines.CoreStateMachines; +import org.neo4j.coreedge.core.state.machines.tx.CoreReplicatedContent; import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; import org.neo4j.coreedge.core.state.snapshot.CoreStateType; import org.neo4j.coreedge.core.state.storage.StateStorage; -import org.neo4j.coreedge.core.state.machines.tx.CoreReplicatedContent; -import org.neo4j.coreedge.core.consensus.log.RaftLog; -import org.neo4j.coreedge.core.consensus.log.RaftLogEntry; -import org.neo4j.coreedge.core.consensus.log.monitoring.RaftLogCommitIndexMonitor; -import org.neo4j.coreedge.core.consensus.log.segmented.InFlightMap; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.monitoring.Monitors; @@ -63,6 +63,7 @@ public class CommandApplicationProcess extends LifecycleAdapter private CoreStateMachines coreStateMachines; + private boolean started; private long lastApplied = NOTHING; private long lastSeenCommitIndex = NOTHING; private long lastFlushed = NOTHING; @@ -98,17 +99,26 @@ public CommandApplicationProcess( synchronized void notifyCommitted( long commitIndex ) { assert this.lastSeenCommitIndex <= commitIndex; + if ( this.lastSeenCommitIndex < commitIndex ) { this.lastSeenCommitIndex = commitIndex; - submitApplyJob( commitIndex ); - commitIndexMonitor.commitIndex( commitIndex ); + + /* ReplicationModule might already be up and running, but we might not + yet be ready to handle requests for applying committed state. At startup + the lastSeenCommitIndex will be taken into consideration. */ + if ( started ) + { + submitApplyJob( commitIndex ); + commitIndexMonitor.commitIndex( commitIndex ); + } } } private void submitApplyJob( long lastToApply ) { - applier.submit( ( status ) -> () -> { + boolean success = applier.submit( ( status ) -> () -> + { try ( InFlightLogEntryReader logEntrySupplier = new InFlightLogEntryReader( raftLog, inFlightMap, true ) ) { for ( long logIndex = lastApplied + 1; !status.isCancelled() && logIndex <= lastToApply; logIndex++ ) @@ -116,7 +126,7 @@ private void submitApplyJob( long lastToApply ) RaftLogEntry entry = logEntrySupplier.get( logIndex ); if ( entry == null ) { - throw new IllegalStateException( "Committed log entry must exist." ); + throw new IllegalStateException( format( "Committed log %d entry must exist.", logIndex ) ); } if ( entry.content() instanceof DistributedOperation ) @@ -137,8 +147,23 @@ private void submitApplyJob( long lastToApply ) { log.error( "Failed to apply up to index " + lastToApply, e ); dbHealth.get().panic( e ); + applier.panic(); } } ); + + if ( !success ) + { + log.error( "Applier has entered a state of panic, no more jobs can be submitted." ); + try + { + // Let's sleep a while so that the log does not get flooded in this state. + // TODO: Consider triggering a shutdown of the database on panic. + Thread.sleep( 1000 ); + } + catch ( InterruptedException ignored ) + { + } + } } synchronized long lastApplied() @@ -146,7 +171,7 @@ synchronized long lastApplied() return lastApplied; } - public void sync() throws InterruptedException + public synchronized void sync() throws InterruptedException { applier.sync( true ); } @@ -208,6 +233,7 @@ private void handleOperations( long commandIndex, List ope { if ( !sessionTracker.validateOperation( operation.globalSession(), operation.operationId() ) ) { + sessionTracker.validateOperation( operation.globalSession(), operation.operationId() ); commandIndex++; continue; } @@ -241,7 +267,15 @@ private void flush() throws IOException @Override public synchronized void start() throws IOException, InterruptedException { - lastFlushed = lastApplied = lastFlushedStorage.getInitialState(); + // TODO: check None/Partial/Full here, because this is the first level which can + // TODO: bootstrapping RAFT can also be performed from here. + + if ( lastFlushed == NOTHING ) + { + lastFlushed = lastFlushedStorage.getInitialState(); + } + lastApplied = lastFlushed; + log.info( format( "Restoring last applied index to %d", lastApplied ) ); sessionTracker.start(); @@ -249,19 +283,22 @@ public synchronized void start() throws IOException, InterruptedException * always be furthest ahead and indicate the furthest possible state to * which we must replay to reach a consistent state. */ long lastPossiblyApplying = max( coreStateMachines.getLastAppliedIndex(), sessionTracker.getLastAppliedIndex() ); + lastPossiblyApplying = max( lastPossiblyApplying, lastSeenCommitIndex ); if ( lastPossiblyApplying > lastApplied ) { - log.info( "Recovering up to: " + lastPossiblyApplying ); + log.info( "Applying up to: " + lastPossiblyApplying ); submitApplyJob( lastPossiblyApplying ); applier.sync( false ); } + + started = true; } @Override public synchronized void stop() throws InterruptedException, IOException { - log.info( "CommandApplicationProcess stopping" ); + started = false; applier.sync( true ); flush(); } @@ -293,11 +330,12 @@ synchronized void installSnapshot( CoreSnapshot coreSnapshot, RaftMachine raft ) { throw new RuntimeException( e ); } - this.lastApplied = this.lastFlushed = snapshotPrevIndex; + lastApplied = lastFlushed = snapshotPrevIndex; log.info( format( "Skipping lastApplied index forward to %d", snapshotPrevIndex ) ); raft.installCoreState( coreSnapshot.get( CoreStateType.RAFT_CORE_STATE ) ); sessionTracker.installSnapshot( coreSnapshot.get( CoreStateType.SESSION_TRACKER ) ); + flush(); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/convert/ConvertClassicStoreToCoreCommand.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreBootstrapper.java similarity index 50% rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/convert/ConvertClassicStoreToCoreCommand.java rename to enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreBootstrapper.java index 83035b871403b..9e16959fd092e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/convert/ConvertClassicStoreToCoreCommand.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreBootstrapper.java @@ -17,47 +17,47 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.coreedge.convert; +package org.neo4j.coreedge.core.state; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.Set; +import org.neo4j.coreedge.core.consensus.membership.MembershipEntry; +import org.neo4j.coreedge.core.replication.session.GlobalSessionTrackerState; import org.neo4j.coreedge.core.state.machines.id.IdAllocationState; +import org.neo4j.coreedge.core.state.machines.locks.ReplicatedLockTokenState; import org.neo4j.coreedge.core.state.machines.tx.LogIndexTxHeaderEncoding; -import org.neo4j.coreedge.core.state.storage.DurableStateStorageImporter; +import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; +import org.neo4j.coreedge.core.state.snapshot.CoreStateType; +import org.neo4j.coreedge.core.state.snapshot.RaftCoreState; +import org.neo4j.coreedge.identity.MemberId; import org.neo4j.coreedge.identity.StoreId; -import org.neo4j.graphdb.factory.EnterpriseGraphDatabaseFactory; -import org.neo4j.graphdb.factory.GraphDatabaseBuilder; -import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; -import org.neo4j.kernel.api.exceptions.TransactionFailureException; -import org.neo4j.kernel.impl.api.TransactionCommitProcess; -import org.neo4j.kernel.impl.api.TransactionToApply; -import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; +import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.pagecache.StandalonePageCacheFactory; import org.neo4j.kernel.impl.store.MetaDataStore; +import org.neo4j.kernel.impl.store.NeoStores; +import org.neo4j.kernel.impl.store.StoreFactory; import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory; import org.neo4j.kernel.impl.store.id.IdGenerator; import org.neo4j.kernel.impl.store.id.IdType; +import org.neo4j.kernel.impl.transaction.log.FlushableChannel; +import org.neo4j.kernel.impl.transaction.log.LogHeaderCache; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; -import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; -import org.neo4j.kernel.internal.DatabaseHealth; -import org.neo4j.kernel.internal.GraphDatabaseAPI; -import org.neo4j.kernel.internal.KernelEventHandlers; -import org.neo4j.kernel.lifecycle.Lifespan; -import org.neo4j.logging.NullLog; +import org.neo4j.kernel.impl.transaction.log.ReadOnlyLogVersionRepository; +import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionIdStore; +import org.neo4j.kernel.impl.transaction.log.TransactionLogWriter; +import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLogProvider; -import org.neo4j.storageengine.api.TransactionApplicationMode; -import static org.neo4j.coreedge.core.state.machines.CoreStateMachinesModule.ID_ALLOCATION_NAME; import static org.neo4j.kernel.impl.store.MetaDataStore.Position.LAST_TRANSACTION_ID; import static org.neo4j.kernel.impl.store.MetaDataStore.Position.RANDOM_NUMBER; -import static org.neo4j.kernel.impl.store.MetaDataStore.Position.TIME; -import static org.neo4j.kernel.impl.store.MetaDataStore.Position.UPGRADE_TIME; -import static org.neo4j.kernel.impl.store.MetaDataStore.Position.UPGRADE_TRANSACTION_ID; import static org.neo4j.kernel.impl.store.StoreFactory.LABEL_TOKEN_NAMES_STORE_NAME; import static org.neo4j.kernel.impl.store.StoreFactory.LABEL_TOKEN_STORE_NAME; import static org.neo4j.kernel.impl.store.StoreFactory.NODE_LABELS_STORE_NAME; @@ -88,112 +88,76 @@ import static org.neo4j.kernel.impl.store.id.IdType.SCHEMA; import static org.neo4j.kernel.impl.store.id.IdType.STRING_BLOCK; -public class ConvertClassicStoreToCoreCommand +public class CoreBootstrapper { - private final ConversionVerifier conversionVerifier; + private static final long FIRST_INDEX = 0L; + private static final long FIRST_TERM = 0L; - public ConvertClassicStoreToCoreCommand( ConversionVerifier conversionVerifier ) - { - this.conversionVerifier = conversionVerifier; - } - - public void convert( File databaseDir, String recordFormat, String conversionId ) throws IOException, TransactionFailureException - - { - ClusterSeed metadata = ClusterSeed.create( conversionId ); - verify( databaseDir, metadata ); - changeStoreId( databaseDir, metadata ); - appendNullTransactionLogEntryToSetRaftIndexToMinusOne( databaseDir, recordFormat ); - addIdAllocationState( databaseDir ); - } - - private void verify( File databaseDir, ClusterSeed metadata ) throws IOException - { - StoreMetadata storeMetadata = targetMetadata( databaseDir ); - conversionVerifier.conversionGuard( metadata, storeMetadata ); - } - - private StoreMetadata targetMetadata( File databaseDir ) throws IOException - { - FileSystemAbstraction fs = new DefaultFileSystemAbstraction(); - File metadataStore = new File( databaseDir, MetaDataStore.DEFAULT_NAME ); - try ( PageCache pageCache = StandalonePageCacheFactory.createPageCache( fs ) ) - { - StoreId before = readStoreId( metadataStore, pageCache ); - long lastTxId = MetaDataStore.getRecord( pageCache, metadataStore, LAST_TRANSACTION_ID ); - return new StoreMetadata( before, lastTxId ); - } - } + private final File storeDir; + private final PageCache pageCache; + private final FileSystemAbstraction fs; + private final Config config; - private static ClusterSeed changeStoreId( File storeDir, ClusterSeed conversionId ) throws IOException + public CoreBootstrapper( File storeDir, PageCache pageCache, FileSystemAbstraction fs, Config config ) { - FileSystemAbstraction fs = new DefaultFileSystemAbstraction(); - File metadataStore = new File( storeDir, MetaDataStore.DEFAULT_NAME ); - try ( PageCache pageCache = StandalonePageCacheFactory.createPageCache( fs ) ) - { - StoreId before = readStoreId( metadataStore, pageCache ); - - long lastTxId = MetaDataStore.getRecord( pageCache, metadataStore, LAST_TRANSACTION_ID ); - - long upgradeTime = conversionId.after().getUpgradeTime(); - long upgradeId = conversionId.after().getUpgradeId(); - MetaDataStore.setRecord( pageCache, metadataStore, UPGRADE_TIME, upgradeTime ); - MetaDataStore.setRecord( pageCache, metadataStore, UPGRADE_TRANSACTION_ID, upgradeId ); - - StoreId after = readStoreId( metadataStore, pageCache ); - return new ClusterSeed( before, after, lastTxId ); - } + this.storeDir = storeDir; + this.pageCache = pageCache; + this.fs = fs; + this.config = config; } - private static StoreId readStoreId( File metadataStore, PageCache pageCache ) throws IOException + CoreSnapshot bootstrap( Set members ) throws IOException { - long creationTime = MetaDataStore.getRecord( pageCache, metadataStore, TIME ); - long randomNumber = MetaDataStore.getRecord( pageCache, metadataStore, RANDOM_NUMBER ); - long upgradeTime = MetaDataStore.getRecord( pageCache, metadataStore, UPGRADE_TIME ); - long upgradeId = MetaDataStore.getRecord( pageCache, metadataStore, UPGRADE_TRANSACTION_ID ); - return new StoreId( creationTime, randomNumber, upgradeTime, upgradeId ); + StoreFactory factory = new StoreFactory( storeDir, config, + new DefaultIdGeneratorFactory( fs ), pageCache, fs, NullLogProvider.getInstance() ); + + NeoStores neoStores = factory.openAllNeoStores( true ); + neoStores.close(); + + CoreSnapshot coreSnapshot = new CoreSnapshot( FIRST_INDEX, FIRST_TERM ); + coreSnapshot.add( CoreStateType.ID_ALLOCATION, deriveIdAllocationState( storeDir ) ); + coreSnapshot.add( CoreStateType.LOCK_TOKEN, new ReplicatedLockTokenState() ); + coreSnapshot.add( CoreStateType.RAFT_CORE_STATE, + new RaftCoreState( new MembershipEntry( FIRST_INDEX, members ) ) ); + coreSnapshot.add( CoreStateType.SESSION_TRACKER, new GlobalSessionTrackerState() ); + appendNullTransactionLogEntryToSetRaftIndexToMinusOne(); + return coreSnapshot; } - private void appendNullTransactionLogEntryToSetRaftIndexToMinusOne( File dbDir, String recordFormat ) throws - TransactionFailureException + private void appendNullTransactionLogEntryToSetRaftIndexToMinusOne() throws IOException { - GraphDatabaseBuilder builder = new EnterpriseGraphDatabaseFactory().newEmbeddedDatabaseBuilder( dbDir ) - .setConfig( GraphDatabaseSettings.record_format, recordFormat ); - - GraphDatabaseAPI db = (GraphDatabaseAPI) builder.newGraphDatabase(); - - TransactionCommitProcess commitProcess = db.getDependencyResolver().resolveDependency( - TransactionCommitProcess.class ); + PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, fs ); + ReadOnlyLogVersionRepository logVersionRepository = new ReadOnlyLogVersionRepository( pageCache, storeDir ); + ReadOnlyTransactionIdStore readOnlyTransactionIdStore = new ReadOnlyTransactionIdStore( pageCache, storeDir ); + PhysicalLogFile logFile = new PhysicalLogFile( fs, logFiles, Long.MAX_VALUE /*don't rotate*/, + () -> readOnlyTransactionIdStore.getLastCommittedTransactionId() - 1, logVersionRepository, + new Monitors().newMonitor( PhysicalLogFile.Monitor.class ), + new LogHeaderCache( 10 ) ); + logFile.init(); + logFile.start(); + + FlushableChannel channel = logFile.getWriter(); + TransactionLogWriter writer = new TransactionLogWriter( new LogEntryWriter( channel ) ); PhysicalTransactionRepresentation tx = new PhysicalTransactionRepresentation( Collections.emptyList() ); byte[] txHeaderBytes = LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader( -1 ); tx.setHeader( txHeaderBytes, -1, -1, -1, -1, -1, -1 ); - commitProcess.commit( new TransactionToApply( tx ), CommitEvent.NULL, TransactionApplicationMode.EXTERNAL ); - - db.shutdown(); - } - - private void addIdAllocationState( File dbDir ) throws IOException - { - File clusterStateDirectory = new File( dbDir, "cluster-state" ); + long dummyTransactionId = readOnlyTransactionIdStore.getLastCommittedTransactionId() + 1; + writer.append( tx, dummyTransactionId ); + channel.prepareForFlush().flush(); - DefaultFileSystemAbstraction fileSystem = new DefaultFileSystemAbstraction(); - DefaultIdGeneratorFactory factory = new DefaultIdGeneratorFactory( fileSystem ); + logFile.stop(); + logFile.shutdown(); - DurableStateStorageImporter storage = new DurableStateStorageImporter<>( - fileSystem, clusterStateDirectory, ID_ALLOCATION_NAME, new IdAllocationState.Marshal(), - 1000, () -> new DatabaseHealth( new DatabasePanicEventGenerator( new KernelEventHandlers( NullLog.getInstance() ) ), - NullLog.getInstance() ), NullLogProvider.getInstance() ); - - try ( Lifespan lifespan = new Lifespan( storage ) ) - { - storage.persist( state( dbDir, factory ) ); - } + File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME ); + MetaDataStore.setRecord( pageCache, neoStoreFile, LAST_TRANSACTION_ID, dummyTransactionId ); } - private IdAllocationState state( File dbDir, DefaultIdGeneratorFactory factory ) + private IdAllocationState deriveIdAllocationState( File dbDir ) throws IOException { + DefaultIdGeneratorFactory factory = new DefaultIdGeneratorFactory( fs ); + long[] highIds = new long[]{ getHighId( dbDir, factory, NODE, NODE_STORE_NAME ), getHighId( dbDir, factory, RELATIONSHIP, RELATIONSHIP_STORE_NAME ), @@ -211,7 +175,7 @@ private IdAllocationState state( File dbDir, DefaultIdGeneratorFactory factory ) getHighId( dbDir, factory, NODE_LABELS, NODE_LABELS_STORE_NAME ), getHighId( dbDir, factory, RELATIONSHIP_GROUP, RELATIONSHIP_GROUP_STORE_NAME )}; - return new IdAllocationState( highIds, -1 ); + return new IdAllocationState( highIds, FIRST_INDEX ); } private long getHighId( File coreDir, DefaultIdGeneratorFactory factory, IdType idType, String store ) @@ -222,7 +186,7 @@ private long getHighId( File coreDir, DefaultIdGeneratorFactory factory, IdType return highId; } - private String idFile( String store ) + private static String idFile( String store ) { return MetaDataStore.DEFAULT_NAME + store + ".id"; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java index 8d9fb10f5eb8e..1669c560036a3 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java @@ -20,39 +20,49 @@ package org.neo4j.coreedge.core.state; import java.io.IOException; +import java.util.concurrent.CountDownLatch; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; -import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; -import org.neo4j.coreedge.core.state.snapshot.CoreStateDownloader; import org.neo4j.coreedge.core.consensus.RaftMachine; import org.neo4j.coreedge.core.consensus.RaftMessages; import org.neo4j.coreedge.core.consensus.log.pruning.LogPruner; import org.neo4j.coreedge.core.consensus.outcome.ConsensusOutcome; +import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; +import org.neo4j.coreedge.core.state.snapshot.CoreStateDownloader; +import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; import org.neo4j.coreedge.messaging.Inbound.MessageHandler; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import static java.util.concurrent.TimeUnit.MINUTES; + public class CoreState implements MessageHandler, LogPruner, Lifecycle { private final RaftMachine raftMachine; private final LocalDatabase localDatabase; private final Log log; private final CoreStateDownloader downloader; + private final BindingService bindingService; private final CommandApplicationProcess applicationProcess; + private final CountDownLatch bootstrapLatch = new CountDownLatch( 1 ); + + private ClusterId boundClusterId; // TODO: Use for network message filtering. public CoreState( RaftMachine raftMachine, LocalDatabase localDatabase, LogProvider logProvider, CoreStateDownloader downloader, + BindingService bindingService, CommandApplicationProcess commandApplicationProcess ) { this.raftMachine = raftMachine; this.localDatabase = localDatabase; this.downloader = downloader; + this.bindingService = bindingService; this.log = logProvider.getLog( getClass() ); this.applicationProcess = commandApplicationProcess; } @@ -64,11 +74,11 @@ public void handle( RaftMessages.StoreIdAwareMessage storeIdAwareMessage ) ConsensusOutcome outcome = raftMachine.handle( storeIdAwareMessage.message() ); if ( outcome.needsFreshSnapshot() ) { - notifyNeedFreshSnapshot( storeIdAwareMessage.message().from() ); + downloadSnapshot( storeIdAwareMessage.message().from() ); } else { - notifyCommitted( outcome.getCommitIndex()); + notifyCommitted( outcome.getCommitIndex() ); } } catch ( Throwable e ) @@ -83,17 +93,12 @@ private synchronized void notifyCommitted( long commitIndex ) applicationProcess.notifyCommitted( commitIndex ); } - private synchronized void notifyNeedFreshSnapshot( MemberId source ) - { - downloadSnapshot( source ); - } - /** * Attempts to download a fresh snapshot from another core instance. * * @param source The source address to attempt a download of a snapshot from. */ - private void downloadSnapshot( MemberId source ) + private synchronized void downloadSnapshot( MemberId source ) { try { @@ -111,9 +116,10 @@ public synchronized CoreSnapshot snapshot() throws IOException, InterruptedExcep return applicationProcess.snapshot( raftMachine ); } - public synchronized void installSnapshot( CoreSnapshot coreSnapshot ) throws IOException + public synchronized void installSnapshot( CoreSnapshot coreSnapshot ) throws Throwable { applicationProcess.installSnapshot( coreSnapshot, raftMachine ); + bootstrapLatch.countDown(); } @SuppressWarnings("unused") // used in embedded robustness testing @@ -129,27 +135,53 @@ public void prune() throws IOException } @Override - public void start() throws IOException, InterruptedException + public void init() throws Throwable { - applicationProcess.start(); + localDatabase.init(); + applicationProcess.init(); } @Override - public void stop() throws IOException, InterruptedException + public void start() throws Throwable { - log.info( "CoreState stopping" ); - applicationProcess.stop(); + // How can state be installed? + // 1. Already installed (detected by checking on-disk state) + // 2. Bootstrap (single selected server) + // 3. Download from someone else (others) + + // TODO: Binding service can return whether or not we are allowed to bootstrap. ClusterId can be exposed at the interface. + boundClusterId = bindingService.bindToCluster( this::installSnapshot ); + + // TODO: Move haveState and CoreBootstrapper into CommandApplicationProcess, which perhaps needs a better name. + // TODO: Include the None/Partial/Full in the move. + if ( !haveState() ) + { + boolean acquired = bootstrapLatch.await( 1, MINUTES ); + if ( !acquired ) + { + throw new RuntimeException( "Timed out while waiting to download a snapshot from another cluster member" ); + } + } + localDatabase.start(); + applicationProcess.start(); + } + + private boolean haveState() + { + return raftMachine.state().entryLog().appendIndex() > -1; } @Override - public void init() throws Throwable + public void stop() throws Throwable { - applicationProcess.init(); + applicationProcess.stop(); + localDatabase.stop(); } @Override public void shutdown() throws Throwable { applicationProcess.shutdown(); + localDatabase.shutdown(); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreStateApplier.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreStateApplier.java index e37c4883d7b4e..66ba09a3997ba 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreStateApplier.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreStateApplier.java @@ -39,7 +39,7 @@ public class Status { private volatile boolean cancelled; - public boolean isCancelled() + boolean isCancelled() { return cancelled; } @@ -50,6 +50,7 @@ public boolean isCancelled() private final Status status = new Status(); private ExecutorService applier; + private boolean isPanic; public CoreStateApplier( LogProvider logProvider ) { @@ -70,40 +71,60 @@ private void spawnExecutor() * @param abortableTask A function that creates a runnable that can use * the status flag to decide when to abort. */ - public void submit( Function abortableTask ) + public boolean submit( Function abortableTask ) { + if ( isPanic ) + { + return false; + } + if ( status.cancelled ) { log.warn( "Task submitted while cancelled" ); } applier.submit( abortableTask.apply( status ) ); + return true; } /** * Used for synchronizing with the internal executor. * * @param cancelTasks Whether or not to flag for cancelling. - * - * @throws InterruptedException */ - public void sync( boolean cancelTasks ) throws InterruptedException + public void sync( boolean cancelTasks ) { - if ( applier != null ) + if ( cancelTasks ) + { + status.cancelled = true; + } + + applier.shutdown(); + + do { - if ( cancelTasks ) + try { - status.cancelled = true; + applier.awaitTermination( 1, MINUTES ); + } + catch ( InterruptedException ignored ) + { + log.warn( "Unexpected interrupt", ignored ); } - applier.shutdown(); - - while ( !applier.awaitTermination( 1, MINUTES ) ) + if ( !applier.isTerminated() ) { log.warn( "Applier is taking an unusually long time to sync" ); } } + while ( !applier.isTerminated() ); spawnExecutor(); } + + public void panic() + { + applier.shutdown(); + isPanic = true; + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/DumpClusterState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/DumpClusterState.java index f15acb1e36987..a4fbe2019b823 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/DumpClusterState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/DumpClusterState.java @@ -31,6 +31,7 @@ import org.neo4j.coreedge.core.state.machines.id.IdAllocationState; import org.neo4j.coreedge.core.state.machines.locks.ReplicatedLockTokenState; import org.neo4j.coreedge.core.state.storage.DurableStateStorage; +import org.neo4j.coreedge.core.state.storage.SimpleFileStorage; import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.core.state.storage.StateMarshal; import org.neo4j.coreedge.identity.MemberId; @@ -41,7 +42,7 @@ import org.neo4j.kernel.lifecycle.Lifespan; import org.neo4j.logging.NullLogProvider; -import static org.neo4j.coreedge.ReplicationModule.LAST_FLUSHED_NAME; +import static org.neo4j.coreedge.core.server.CoreServerModule.LAST_FLUSHED_NAME; import static org.neo4j.coreedge.ReplicationModule.SESSION_TRACKER_NAME; import static org.neo4j.coreedge.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME; import static org.neo4j.coreedge.core.IdentityModule.CORE_MEMBER_ID_NAME; @@ -86,7 +87,7 @@ public static void main( String[] args ) throws IOException void dump() throws IOException { - SimpleStorage memberIdStorage = new SimpleStorage<>( fs, clusterStateDirectory, CORE_MEMBER_ID_NAME, + SimpleStorage memberIdStorage = new SimpleFileStorage<>( fs, clusterStateDirectory, CORE_MEMBER_ID_NAME, new Marshal(), NullLogProvider.getInstance() ); if ( memberIdStorage.exists() ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/CoreStateMachinesModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/CoreStateMachinesModule.java index 8fa8db0ba9be2..c3038d8a0820c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/CoreStateMachinesModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/CoreStateMachinesModule.java @@ -20,7 +20,6 @@ package org.neo4j.coreedge.core.state.machines; import java.io.File; -import java.io.IOException; import java.util.HashMap; import java.util.Map; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/locks/ReplicatedLockTokenState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/locks/ReplicatedLockTokenState.java index 8defdf2573619..85fc579f38f57 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/locks/ReplicatedLockTokenState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/locks/ReplicatedLockTokenState.java @@ -20,6 +20,7 @@ package org.neo4j.coreedge.core.state.machines.locks; import java.io.IOException; +import java.util.Objects; import org.neo4j.coreedge.messaging.marshalling.ChannelMarshal; import org.neo4j.coreedge.messaging.EndOfStreamException; @@ -35,11 +36,11 @@ public class ReplicatedLockTokenState private ReplicatedLockTokenRequest currentToken = INVALID_REPLICATED_LOCK_TOKEN_REQUEST; private long ordinal = -1L; - ReplicatedLockTokenState() + public ReplicatedLockTokenState() { } - ReplicatedLockTokenState( long ordinal, ReplicatedLockTokenRequest currentToken ) + public ReplicatedLockTokenState( long ordinal, ReplicatedLockTokenRequest currentToken ) { this.ordinal = ordinal; this.currentToken = currentToken; @@ -64,10 +65,29 @@ long ordinal() @Override public String toString() { - return "ReplicatedLockTokenState{" + - "currentToken=" + currentToken + - ", ordinal=" + ordinal + - '}'; + return String.format( "ReplicatedLockTokenState{currentToken=%s, ordinal=%d}", currentToken, ordinal ); + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + ReplicatedLockTokenState that = (ReplicatedLockTokenState) o; + return ordinal == that.ordinal && + Objects.equals( currentToken, that.currentToken ); + } + + @Override + public int hashCode() + { + return Objects.hash( currentToken, ordinal ); } ReplicatedLockTokenState newInstance() diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/tx/LastCommittedIndexFinder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/tx/LastCommittedIndexFinder.java index 5f4c98e5ecce0..d55d1906eee62 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/tx/LastCommittedIndexFinder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/machines/tx/LastCommittedIndexFinder.java @@ -33,21 +33,21 @@ * Finds the last committed transaction in the transaction log, then decodes the header as a raft index. * This allows us to correlate raft log with transaction log on recovery. */ -class LastCommittedIndexFinder +public class LastCommittedIndexFinder { private final TransactionIdStore transactionIdStore; private final LogicalTransactionStore transactionStore; private final Log log; - LastCommittedIndexFinder( TransactionIdStore transactionIdStore, - LogicalTransactionStore transactionStore, LogProvider logProvider ) + public LastCommittedIndexFinder( TransactionIdStore transactionIdStore, + LogicalTransactionStore transactionStore, LogProvider logProvider ) { this.transactionIdStore = transactionIdStore; this.transactionStore = transactionStore; this.log = logProvider.getLog( getClass() ); } - long getLastCommittedIndex() + public long getLastCommittedIndex() { long lastCommittedIndex; long lastTxId = transactionIdStore.getLastCommittedTransactionId(); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/storage/SimpleFileStorage.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/storage/SimpleFileStorage.java new file mode 100644 index 0000000000000..799e60d61ef23 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/storage/SimpleFileStorage.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.core.state.storage; + +import java.io.File; +import java.io.IOException; + +import org.neo4j.coreedge.messaging.EndOfStreamException; +import org.neo4j.coreedge.messaging.marshalling.ChannelMarshal; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.log.FlushableChannel; +import org.neo4j.kernel.impl.transaction.log.PhysicalFlushableChannel; +import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel; +import org.neo4j.kernel.impl.transaction.log.ReadableClosableChannel; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +public class SimpleFileStorage implements SimpleStorage +{ + private final FileSystemAbstraction fileSystem; + private final ChannelMarshal marshal; + private final File file; + private Log log; + + public SimpleFileStorage( FileSystemAbstraction fileSystem, File directory, String name, + ChannelMarshal marshal, LogProvider logProvider ) + { + this.fileSystem = fileSystem; + this.log = logProvider.getLog( getClass() ); + this.file = new File( DurableStateStorage.stateDir( directory, name ), name ); + this.marshal = marshal; + } + + @Override + public boolean exists() + { + return fileSystem.fileExists( file ); + } + + @Override + public T readState() throws IOException + { + try ( ReadableClosableChannel channel = new ReadAheadChannel<>( fileSystem.open( file, "r" ) ) ) + { + return marshal.unmarshal( channel ); + } + catch ( EndOfStreamException e ) + { + log.error( "End of stream reached: " + file ); + throw new IOException( e ); + } + } + + @Override + public void writeState( T state ) throws IOException + { + fileSystem.mkdirs( file.getParentFile() ); + fileSystem.deleteFile( file ); + + try ( FlushableChannel channel = new PhysicalFlushableChannel( fileSystem.create( file ) ) ) + { + marshal.marshal( state, channel ); + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/storage/SimpleStorage.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/storage/SimpleStorage.java index c194cbd835045..5f096a4b6bd52 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/storage/SimpleStorage.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/storage/SimpleStorage.java @@ -19,61 +19,13 @@ */ package org.neo4j.coreedge.core.state.storage; -import java.io.File; import java.io.IOException; -import org.neo4j.coreedge.messaging.EndOfStreamException; -import org.neo4j.coreedge.messaging.marshalling.ChannelMarshal; -import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.kernel.impl.transaction.log.FlushableChannel; -import org.neo4j.kernel.impl.transaction.log.PhysicalFlushableChannel; -import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel; -import org.neo4j.kernel.impl.transaction.log.ReadableClosableChannel; -import org.neo4j.logging.Log; -import org.neo4j.logging.LogProvider; - -public class SimpleStorage +public interface SimpleStorage { - private final FileSystemAbstraction fileSystem; - private final ChannelMarshal marshal; - private final File file; - private Log log; - - public SimpleStorage( FileSystemAbstraction fileSystem, File directory, String name, - ChannelMarshal marshal, LogProvider logProvider ) - { - this.fileSystem = fileSystem; - this.log = logProvider.getLog( getClass() ); - this.file = new File( DurableStateStorage.stateDir( directory, name ), name ); - this.marshal = marshal; - } - - public boolean exists() - { - return fileSystem.fileExists( file ); - } - - public T readState() throws IOException - { - try ( ReadableClosableChannel channel = new ReadAheadChannel<>( fileSystem.open( file, "r" ) ) ) - { - return marshal.unmarshal( channel ); - } - catch ( EndOfStreamException e ) - { - log.error( "End of stream reached: " + file ); - throw new IOException( e ); - } - } + boolean exists(); - public void writeState( T state ) throws IOException - { - fileSystem.mkdirs( file.getParentFile() ); - fileSystem.deleteFile( file ); + T readState() throws IOException; - try ( FlushableChannel channel = new PhysicalFlushableChannel( fileSystem.create( file ) ) ) - { - marshal.marshal( state, channel ); - } - } + void writeState( T state ) throws IOException; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EdgeStartupProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EdgeStartupProcess.java index 1ae6ee52e9097..54a7c85a6af2f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EdgeStartupProcess.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EdgeStartupProcess.java @@ -76,14 +76,10 @@ public void init() throws Throwable @Override public void start() throws Throwable { - localDatabase.start(); - MemberId source = findCoreMemberToCopyFrom(); if ( localDatabase.isEmpty() ) { log.info( "Local database is empty, attempting to replace with copy from core server %s", source ); - log.info( "Stopping local database before copy." ); - localDatabase.stop(); log.info( "Finding store id of core server %s", source ); StoreId storeId = storeFetcher.getStoreIdOf( source ); @@ -93,13 +89,13 @@ public void start() throws Throwable copyWholeStoreFrom( source, storeId, storeFetcher ); log.info( "Restarting local database after copy.", source ); - localDatabase.start(); } else { ensureSameStoreIdAs( source ); } + localDatabase.start(); txPulling.start(); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java index 582fd9712fb47..d8479b12ed325 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java @@ -217,8 +217,8 @@ protected Log authManagerLog() LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, new StoreFiles( new DefaultFileSystemAbstraction() ), platformModule.dataSourceManager, - dependencies.provideDependency( TransactionIdStore.class ), - databaseHealthSupplier, logProvider ); + pageCache, + databaseHealthSupplier ); TxPollingClient txPuller = new TxPollingClient( logProvider, localDatabase, catchUpClient, new ConnectToRandomCoreMember( discoveryService ), diff --git a/enterprise/core-edge/src/main/resources/META-INF/services/org.neo4j.commandline.admin.AdminCommand$Provider b/enterprise/core-edge/src/main/resources/META-INF/services/org.neo4j.commandline.admin.AdminCommand$Provider deleted file mode 100644 index 5a9fef5e0510e..0000000000000 --- a/enterprise/core-edge/src/main/resources/META-INF/services/org.neo4j.commandline.admin.AdminCommand$Provider +++ /dev/null @@ -1,2 +0,0 @@ -org.neo4j.coreedge.backup.RestoreNewClusterCli$Provider -org.neo4j.coreedge.backup.RestoreExistingClusterCli$Provider diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/BackupCoreIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/BackupCoreIT.java index 790755e2d2b95..243382377a31b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/BackupCoreIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/BackupCoreIT.java @@ -19,92 +19,53 @@ */ package org.neo4j.coreedge.backup; -import java.io.ByteArrayOutputStream; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + import java.io.File; -import java.io.PrintStream; import java.net.InetSocketAddress; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.TimeoutException; -import java.util.stream.Stream; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; import org.neo4j.backup.OnlineBackupSettings; -import org.neo4j.coreedge.TestStoreId; import org.neo4j.coreedge.core.CoreEdgeClusterSettings; import org.neo4j.coreedge.core.CoreGraphDatabase; import org.neo4j.coreedge.discovery.Cluster; import org.neo4j.coreedge.discovery.CoreClusterMember; -import org.neo4j.coreedge.identity.StoreId; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.helpers.collection.MapUtil; -import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.test.DbRepresentation; import org.neo4j.test.coreedge.ClusterRule; -import org.neo4j.test.rule.SuppressOutput; - -import static java.util.stream.Collectors.toList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; - import static org.neo4j.backup.BackupEmbeddedIT.runBackupToolFromOtherJvmToGetExitCode; -import static org.neo4j.coreedge.TestStoreId.assertAllStoresHaveTheSameStoreId; -import static org.neo4j.coreedge.backup.ArgsBuilder.args; -import static org.neo4j.coreedge.backup.ArgsBuilder.toArray; import static org.neo4j.graphdb.Label.label; public class BackupCoreIT { @Rule - public SuppressOutput suppressOutput = SuppressOutput.suppressAll(); - - @Rule - public ClusterRule clusterRule = new ClusterRule( BackupCoreIT.class ) + public ClusterRule clusterRule = new ClusterRule( getClass() ) .withNumberOfCoreMembers( 3 ) .withNumberOfEdgeMembers( 0 ) - .withSharedCoreParam( OnlineBackupSettings.online_backup_enabled, Settings.TRUE ) .withInstanceCoreParam( OnlineBackupSettings.online_backup_server, serverId -> (":" + (8000 + serverId)) ); + private Cluster cluster; - private File backupPath; - private DefaultFileSystemAbstraction fs = new DefaultFileSystemAbstraction(); + private File backupsDir; @Before public void setup() throws Exception { - backupPath = clusterRule.testDirectory().cleanDirectory( "backup-db" ); + backupsDir = clusterRule.testDirectory().cleanDirectory( "backups" ); cluster = clusterRule.startCluster(); } - @Test - public void makeSureBackupCanBePerformed() throws Throwable - { - // Run backup - CoreGraphDatabase db = createSomeData( cluster ); - DbRepresentation beforeChange = DbRepresentation.of( db ); - String[] args = backupArguments( backupAddress( db ), backupPath.getPath() ); - assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) ); - - // Add some new data - DbRepresentation afterChange = DbRepresentation.of( createSomeData( cluster ) ); - - // Verify that backed up database can be started and compare representation - DbRepresentation backupRepresentation = DbRepresentation.of( backupPath, getConfig() ); - assertEquals( beforeChange, backupRepresentation ); - assertNotEquals( backupRepresentation, afterChange ); - } - @Test public void makeSureBackupCanBePerformedFromAnyInstance() throws Throwable { @@ -112,73 +73,20 @@ public void makeSureBackupCanBePerformedFromAnyInstance() throws Throwable { // Run backup DbRepresentation beforeChange = DbRepresentation.of( createSomeData( cluster ) ); - File backupPathPerCoreMachine = new File( backupPath, "" + db.id().hashCode() ); - String[] args = backupArguments( backupAddress( db.database() ), backupPathPerCoreMachine.getPath() ); + File backupDir = new File( backupsDir, Integer.toString( db.serverId() ) ); + String[] args = backupArguments( backupAddress( db.database() ), backupDir.getPath() ); assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) ); // Add some new data DbRepresentation afterChange = DbRepresentation.of( createSomeData( cluster ) ); // Verify that old data is back - DbRepresentation backupRepresentation = DbRepresentation.of( backupPathPerCoreMachine, getConfig() ); + DbRepresentation backupRepresentation = DbRepresentation.of( backupDir, getConfig() ); assertEquals( beforeChange, backupRepresentation ); assertNotEquals( backupRepresentation, afterChange ); } } - @Test - public void makeSureCoreClusterCanBeRestoredFromABackup() throws Throwable - { - // given - CoreGraphDatabase db = createSomeData( cluster ); - DbRepresentation beforeBackup = DbRepresentation.of( db ); - String[] args = backupArguments( backupAddress( db ), backupPath.getPath() ); - assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) ); - - // when we shutdown the cluster we lose the number of core servers so we won't go through the for loop unless - // we capture the count beforehand - List dbPaths = cluster.coreMembers().stream().map( CoreClusterMember::storeDir ).collect( toList() ); - int numberOfCoreMembers = dbPaths.size(); - - cluster.shutdown(); - assertAllStoresHaveTheSameStoreId( dbPaths, fs ); - StoreId storeId = TestStoreId.readStoreId( dbPaths.get( 0 ), fs ); - - // when - - ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrintStream sysOut = new PrintStream( output ); - - Path homeDir = Paths.get( cluster.getCoreMemberById( 0 ).homeDir().getPath() ); - new RestoreNewClusterCli( homeDir, homeDir, sysOut ).execute( toArray( args().from( backupPath ) - .database( "graph.db" ).force().build() ) ); - - String seed = RestoreClusterCliTest.extractSeed( output.toString() ); - - for ( int i = 1; i < numberOfCoreMembers; i++ ) - { - homeDir = Paths.get( cluster.getCoreMemberById( i ).homeDir().getPath() ); - new RestoreExistingClusterCli( homeDir, homeDir ).execute( - toArray( args().from( backupPath ).database( "graph.db" ).seed( seed ).force().build() ) ); - } - - cluster.start(); - - // then - Collection coreGraphDatabases = cluster.coreMembers(); - Stream dbRepresentations = coreGraphDatabases.stream().map( x -> DbRepresentation.of( x - .database() ) ); - dbRepresentations.forEach( afterReSeed -> assertEquals( beforeBackup, afterReSeed ) ); - - List afterRestoreDbPaths = coreGraphDatabases.stream().map( CoreClusterMember::storeDir ).collect( - toList() ); - cluster.shutdown(); - - assertAllStoresHaveTheSameStoreId( afterRestoreDbPaths, fs ); - StoreId afterRestoreStoreId = TestStoreId.readStoreId( afterRestoreDbPaths.get( 0 ), fs ); - assertNotEquals( storeId, afterRestoreStoreId ); - } - static CoreGraphDatabase createSomeData( Cluster cluster ) throws TimeoutException, InterruptedException { return cluster.coreTx( ( db, tx ) -> diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/ClusterSeedingIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/ClusterSeedingIT.java new file mode 100644 index 0000000000000..73b0c1ea83a95 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/ClusterSeedingIT.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.backup; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.function.IntFunction; + +import org.neo4j.backup.OnlineBackupSettings; +import org.neo4j.coreedge.core.CoreGraphDatabase; +import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.coreedge.discovery.SharedDiscoveryService; +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; +import org.neo4j.test.DbRepresentation; +import org.neo4j.test.rule.TestDirectory; + +import static org.junit.Assert.assertEquals; +import static org.neo4j.backup.BackupEmbeddedIT.runBackupToolFromOtherJvmToGetExitCode; +import static org.neo4j.coreedge.backup.BackupCoreIT.backupAddress; +import static org.neo4j.coreedge.discovery.Cluster.dataMatchesEventually; +import static org.neo4j.helpers.collection.MapUtil.stringMap; + +public class ClusterSeedingIT +{ + private Cluster backupCluster; + private Cluster cluster; + private DefaultFileSystemAbstraction fsa = new DefaultFileSystemAbstraction(); + + @Rule + public TestDirectory testDir = TestDirectory.testDirectory(); + + @Before + public void setup() throws Exception + { + HashMap> instanceCoreParams = new HashMap<>(); + instanceCoreParams.put( + OnlineBackupSettings.online_backup_server.name(), + serverId -> (":" + (8000 + serverId)) ); + + backupCluster = new Cluster( testDir.directory( "cluster-for-backup" ), 3, 0, new SharedDiscoveryService(), stringMap(), + instanceCoreParams, stringMap(), new HashMap<>(), StandardV3_0.NAME ); + + cluster = new Cluster( testDir.directory( "cluster-b" ), 3, 0, new SharedDiscoveryService(), stringMap(), + new HashMap<>(), stringMap(), new HashMap<>(), StandardV3_0.NAME ); + } + + @After + public void after() throws Exception + { + if ( backupCluster != null ) + { + backupCluster.shutdown(); + } + if ( cluster != null ) + { + cluster.shutdown(); + } + } + + private File createBackupUsingCoreCluster() throws Exception + { + File backupDir = testDir.directory( "backups" ); + + backupCluster.start(); + CoreGraphDatabase db = BackupCoreIT.createSomeData( backupCluster ); + + String[] args = BackupCoreIT.backupArguments( backupAddress( db ), backupDir.getPath() ); + assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( args ) ); + backupCluster.shutdown(); + + return backupDir; + } + + @Test + public void shouldRestoreBySeedingAllMembers() throws Throwable + { + // given + File backupDir = createBackupUsingCoreCluster(); + DbRepresentation before = DbRepresentation.of( backupDir ); + + // when + fsa.copyRecursively( testDir.directory( "backups" ), cluster.getCoreMemberById( 0 ).storeDir() ); + fsa.copyRecursively( testDir.directory( "backups" ), cluster.getCoreMemberById( 1 ).storeDir() ); + fsa.copyRecursively( testDir.directory( "backups" ), cluster.getCoreMemberById( 2 ).storeDir() ); + cluster.start(); + + // then + dataMatchesEventually( before, cluster.coreMembers() ); + } + + @Test + @Ignore("need to seed all members for now") + public void shouldRestoreBySeedingSingleMember() throws Throwable + { + // given + File backupDir = createBackupUsingCoreCluster(); + DbRepresentation before = DbRepresentation.of( backupDir ); + + // when + fsa.copyRecursively( testDir.directory( "backups" ), cluster.getCoreMemberById( 0 ).storeDir() ); + cluster.getCoreMemberById( 0 ).start(); + Thread.sleep( 2_000 ); + cluster.getCoreMemberById( 1 ).start(); + cluster.getCoreMemberById( 2 ).start(); + + // then + dataMatchesEventually( before, cluster.coreMembers() ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/RestoreClusterCliTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/RestoreClusterCliTest.java deleted file mode 100644 index 9ed9c403db1f5..0000000000000 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/RestoreClusterCliTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.backup; - -import org.junit.Rule; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Optional; - -import org.neo4j.coreedge.convert.ClusterSeed; -import org.neo4j.coreedge.convert.StoreMetadata; -import org.neo4j.coreedge.identity.StoreId; -import org.neo4j.dbms.DatabaseManagementSystemSettings; -import org.neo4j.io.fs.DefaultFileSystemAbstraction; -import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.io.pagecache.PageCache; -import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.impl.pagecache.StandalonePageCacheFactory; -import org.neo4j.kernel.impl.store.MetaDataStore; -import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; -import org.neo4j.server.configuration.ConfigLoader; -import org.neo4j.test.coreedge.ClusterRule; -import org.neo4j.test.rule.TestDirectory; - -import static junit.framework.TestCase.assertFalse; -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.neo4j.coreedge.backup.ArgsBuilder.toArray; -import static org.neo4j.coreedge.convert.GenerateClusterSeedCommand.storeId; -import static org.neo4j.kernel.impl.store.MetaDataStore.Position.LAST_TRANSACTION_ID; -import static org.neo4j.kernel.impl.store.MetaDataStore.Position.UPGRADE_TIME; -import static org.neo4j.kernel.impl.store.MetaDataStore.Position.UPGRADE_TRANSACTION_ID; -import static org.neo4j.kernel.impl.store.MetaDataStore.getRecord; - -public class RestoreClusterCliTest -{ - @Rule - public TestDirectory testDirectory = TestDirectory.testDirectory(); - - @Rule - public final ClusterRule clusterRule = new ClusterRule( getClass() ) - .withNumberOfCoreMembers( 3 ) - .withNumberOfEdgeMembers( 0 ); - - @Test - public void shouldRestoreDatabase() throws Throwable - { - File classicDatabaseDir = testDirectory.cleanDirectory( "classic-db" ); - File classicNeo4jStore = RestoreClusterUtils.createClassicNeo4jStore( classicDatabaseDir, 10, StandardV3_0.NAME ); - StoreMetadata storeMetadata = metadataFor( classicNeo4jStore ); - - // when - Path homeDir = Paths.get( testDirectory.cleanDirectory( "new-db-1" ).getPath() ); - String[] args = toArray( ArgsBuilder.args().from( classicNeo4jStore ).database( "graph.db" ).build() ); - - ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrintStream sysOut = new PrintStream( output ); - - new RestoreNewClusterCli( homeDir, homeDir, sysOut ).execute( args ); - - // then - String seed = extractSeed( output.toString() ); - ClusterSeed clusterSeed = ClusterSeed.create( seed ); - - assertTrue( storeMetadata.storeId().equals( clusterSeed.before() ) ); - assertEquals( storeMetadata.lastTxId(), clusterSeed.lastTxId() ); - assertFalse( storeMetadata.storeId().equals( clusterSeed.after() ) ); - - // when restore to another place - Path rootNewDatabaseDir = Paths.get( testDirectory.cleanDirectory( "new-db-2" ).getPath() ); - String[] newArgs = toArray( ArgsBuilder.args() - .from( classicNeo4jStore ).database( "graph.db" ).seed( seed ).build() ); - - new RestoreExistingClusterCli( rootNewDatabaseDir, rootNewDatabaseDir ).execute( newArgs ); - - // then - StoreMetadata newMetadata = metadataFor( extractDatabaseDir( rootNewDatabaseDir.toFile() ) ); - assertTrue( clusterSeed.after().equals( newMetadata.storeId() ) ); - } - - private File extractDatabaseDir( File rootNewDatabaseDir ) - { - Config config = new ConfigLoader( RestoreExistingClusterCli.settings() ).loadConfig( Optional.of( - rootNewDatabaseDir ), - Optional.of( new File( rootNewDatabaseDir, "neo4j.conf" ) ) ); - return config.get( DatabaseManagementSystemSettings.database_path ); - } - - private StoreMetadata metadataFor( File classicNeo4jStore ) throws IOException - { - FileSystemAbstraction fs = new DefaultFileSystemAbstraction(); - File metadataStore = new File( classicNeo4jStore, MetaDataStore.DEFAULT_NAME ); - - try ( PageCache pageCache = StandalonePageCacheFactory.createPageCache( fs ) ) - { - long lastTxId = getRecord( pageCache, metadataStore, LAST_TRANSACTION_ID ); - long upgradeTime = getRecord( pageCache, metadataStore, UPGRADE_TIME ); - long upgradeId = getRecord( pageCache, metadataStore, UPGRADE_TRANSACTION_ID ); - StoreId classicStoreId = storeId( metadataStore, pageCache, upgradeTime, upgradeId ); - return new StoreMetadata( classicStoreId, lastTxId ); - } - } - - public static String extractSeed( String message ) - { - return message.replace( "Cluster Seed: ", "" ).trim(); - } - -} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/RestoreClusterUtils.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/RestoreClusterUtils.java index fca38a3b13221..5a266672981e9 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/RestoreClusterUtils.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/backup/RestoreClusterUtils.java @@ -26,15 +26,17 @@ import org.neo4j.graphdb.Node; import org.neo4j.graphdb.RelationshipType; import org.neo4j.graphdb.Transaction; -import org.neo4j.graphdb.factory.GraphDatabaseFactory; import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.test.TestGraphDatabaseFactory; public class RestoreClusterUtils { - public static File createClassicNeo4jStore( File base, int nodesToCreate, String recordFormat ) + public static File createClassicNeo4jStore( File base, FileSystemAbstraction fileSystem, int nodesToCreate, String recordFormat ) { File existingDbDir = new File( base, "existing" ); - GraphDatabaseService db = new GraphDatabaseFactory() + GraphDatabaseService db = new TestGraphDatabaseFactory() + .setFileSystem( fileSystem ) .newEmbeddedDatabaseBuilder( existingDbDir ) .setConfig( GraphDatabaseSettings.record_format, recordFormat ) .newGraphDatabase(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabaseTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabaseTest.java deleted file mode 100644 index 97212a1a7d2ef..0000000000000 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabaseTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.catchup.storecopy; - -import org.junit.Test; - -import java.io.File; - -import org.neo4j.coreedge.identity.StoreId; -import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.kernel.NeoStoreDataSource; -import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; -import org.neo4j.kernel.impl.transaction.state.DataSourceManager; -import org.neo4j.kernel.internal.DatabaseHealth; -import org.neo4j.logging.NullLogProvider; - -import static junit.framework.TestCase.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.neo4j.function.Suppliers.singleton; - -public class LocalDatabaseTest -{ - @Test - public void shouldRetrieveStoreId() throws Throwable - { - // given - StoreId storeId = new StoreId( 1, 2, 3, 4 ); - - // when - LocalDatabase localDatabase = createLocalDatabase( new org.neo4j.kernel.impl.store.StoreId( 1, 2, 5, 3, 4 ) ); - localDatabase.start(); - - // then - assertEquals( storeId, localDatabase.storeId() ); - } - - private LocalDatabase createLocalDatabase( org.neo4j.kernel.impl.store.StoreId storeId ) - { - DataSourceManager dataSourceManager = mock( DataSourceManager.class ); - NeoStoreDataSource neoStoreDataSource = mock( NeoStoreDataSource.class ); - when( dataSourceManager.getDataSource() ).thenReturn( neoStoreDataSource ); - when( neoStoreDataSource.getStoreId() ).thenReturn( storeId ); - return new LocalDatabase( new File( "directory" ), - new StoreFiles( mock( FileSystemAbstraction.class ) ), dataSourceManager, - singleton( mock( TransactionIdStore.class ) ), () -> mock( DatabaseHealth.class ), NullLogProvider - .getInstance() ); - } -} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/convert/ConvertNonCoreEdgeStoreCliTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/convert/ConvertNonCoreEdgeStoreCliTest.java deleted file mode 100644 index b59f33518d7ba..0000000000000 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/convert/ConvertNonCoreEdgeStoreCliTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.convert; - -import org.junit.Test; - -import static junit.framework.TestCase.fail; -import static org.junit.Assert.assertTrue; - -public class ConvertNonCoreEdgeStoreCliTest -{ - - @Test - public void shouldIndicateMissingHomeDir() throws Throwable - { - try - { - // given - ConvertNonCoreEdgeStoreCli.main( new String[]{""} ); - fail( "Should have thrown IllegalArgumentException" ); - } - catch ( IllegalArgumentException exception ) - { - assertTrue(exception.getMessage(), exception.getMessage().contains( "Missing argument 'home-dir'" ) ); - } - } - - @Test - public void shouldIndicateMissingDatabase() throws Throwable - { - try - { - // given - ConvertNonCoreEdgeStoreCli.main( new String[]{"--home-dir", "foo"} ); - fail( "Should have thrown IllegalArgumentException" ); - } - catch ( IllegalArgumentException exception ) - { - assertTrue(exception.getMessage(), exception.getMessage().contains( "Missing argument 'database'" ) ); - } - } - - @Test - public void shouldIndicateMissingConfig() throws Throwable - { - try - { - // given - ConvertNonCoreEdgeStoreCli.main( new String[]{"--home-dir", "foo", "--database", "foo"} ); - fail( "Should have thrown IllegalArgumentException" ); - } - catch ( IllegalArgumentException exception ) - { - assertTrue(exception.getMessage(), exception.getMessage().contains( "Missing argument 'config'" ) ); - } - } - -} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/BindingProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/BindingProcessTest.java deleted file mode 100644 index ceab494d3813f..0000000000000 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/BindingProcessTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.core.state; - -import org.junit.Test; - -import java.util.Collections; -import java.util.UUID; - -import org.neo4j.coreedge.discovery.CoreTopology; -import org.neo4j.coreedge.identity.ClusterId; -import org.neo4j.logging.Log; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -public class BindingProcessTest -{ - private Log log = mock( Log.class ); - - @Test - public void notBootstrappableShouldDoNothingWhenNoIdPublished() throws Exception - { - // given - CoreTopology topology = new CoreTopology( null, false, Collections.emptyMap() ); - - BindingProcess binder = new BindingProcess( null, log ); - - // when - ClusterId boundClusterId = binder.attempt( topology ); - - // then - assertNull( boundClusterId ); - } - - @Test - public void notBootstrappableShouldBindToDiscoveredIdWithNoLocalId() throws Exception - { - // given - ClusterId commonClusterId = new ClusterId( UUID.randomUUID() ); - CoreTopology topology = new CoreTopology( commonClusterId, false, Collections.emptyMap() ); - - BindingProcess binder = new BindingProcess( null, log ); - - // when - ClusterId boundClusterId = binder.attempt( topology ); - - // then - assertNotNull( boundClusterId ); - assertEquals( commonClusterId, boundClusterId ); - } - - @Test - public void notBootstrappableShouldBindToMatchingDiscoveredId() throws Exception - { - // given - ClusterId commonClusterId = new ClusterId( UUID.randomUUID() ); - ClusterId localClusterId = new ClusterId( commonClusterId.uuid() ); - - CoreTopology topology = new CoreTopology( commonClusterId, false, Collections.emptyMap() ); - - BindingProcess binder = new BindingProcess( localClusterId, log ); - - // when - ClusterId boundClusterId = binder.attempt( topology ); - - // then - assertNotNull( boundClusterId ); - assertEquals( commonClusterId, boundClusterId ); - } - - @Test - public void notBootstrappableShouldAcceptLocalId() throws Exception - { - // given - ClusterId localClusterId = new ClusterId( UUID.randomUUID() ); - - CoreTopology topology = new CoreTopology( null, false, Collections.emptyMap() ); - - BindingProcess binder = new BindingProcess( localClusterId, log ); - - // when - ClusterId boundClusterId = binder.attempt( topology ); - - // then - assertNotNull( boundClusterId ); - assertEquals( localClusterId, boundClusterId ); - } - - @Test - public void bootstrappableShouldGenerateNewId() throws Exception - { - // given - CoreTopology topology = new CoreTopology( null, true, Collections.emptyMap() ); - - BindingProcess binder = new BindingProcess( null, log ); - - // when - ClusterId boundClusterId = binder.attempt( topology ); - - // then - assertNotNull( boundClusterId ); - assertNotNull( boundClusterId.uuid() ); - } - - @Test - public void bootstrappableShouldPublishLocalId() throws Exception - { - // given - ClusterId localClusterId = new ClusterId( UUID.randomUUID() ); - CoreTopology topology = new CoreTopology( null, true, Collections.emptyMap() ); - - BindingProcess binder = new BindingProcess( localClusterId, log ); - - // when - ClusterId boundClusterId = binder.attempt( topology ); - - // then - assertNotNull( boundClusterId ); - assertEquals( localClusterId, boundClusterId ); - } - - @Test - public void shouldThrowOnClusterIdMismatch() throws Exception - { - // given - ClusterId localClusterId = new ClusterId( UUID.randomUUID() ); - ClusterId commonClusterId = new ClusterId( UUID.randomUUID() ); - CoreTopology topology = new CoreTopology( commonClusterId, false, Collections.emptyMap()); - - // when - BindingProcess binder = new BindingProcess( localClusterId, log ); - - // when - try - { - binder.attempt( topology ); - fail(); - } - catch ( BindingException ignored ) - { - // then: expected - } - } -} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/BindingServiceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/BindingServiceTest.java index 9283bceffe787..bc82b7d0591ca 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/BindingServiceTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/BindingServiceTest.java @@ -19,123 +19,185 @@ */ package org.neo4j.coreedge.core.state; -import org.junit.Test; - +import java.io.IOException; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.junit.Test; + +import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.discovery.CoreTopology; import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.identity.ClusterId; +import org.neo4j.function.ThrowingConsumer; +import org.neo4j.logging.NullLogProvider; import org.neo4j.time.Clocks; +import org.neo4j.time.FakeClock; -import static java.lang.Thread.sleep; import static java.util.Collections.emptyMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.neo4j.logging.NullLogProvider.getInstance; public class BindingServiceTest { - private CoreTopologyService topologyService = mock( CoreTopologyService.class ); - private SimpleStorage clusterIdStorage = mock( SimpleStorage.class ); + private final CoreBootstrapper coreBootstrapper = mock( CoreBootstrapper.class ); + private final FakeClock clock = Clocks.fakeClock(); @Test - public void shouldTimeoutEventually() throws Throwable + public void shouldTimeoutWhenNotBootrappableAndNobodyElsePublishesClusterId() throws Throwable { // given - CoreTopology topology = new CoreTopology( null, false, emptyMap() ); - - when( topologyService.coreServers() ).thenReturn( topology ); + CoreTopology unboundTopology = new CoreTopology( null, false, emptyMap() ); + CoreTopologyService topologyService = mock( CoreTopologyService.class ); + when( topologyService.coreServers() ).thenReturn( unboundTopology ); - BindingService bindingService = new BindingService( clusterIdStorage, topologyService, - getInstance(), Clocks.systemClock(), () -> sleep( 1 ), 50 ); + BindingService binder = new BindingService( new StubClusterIdStorage(), topologyService, + NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000, + coreBootstrapper ); - // when try { - bindingService.start(); - fail(); + // when + binder.bindToCluster( null ); + fail( "Should have timed out" ); } catch ( TimeoutException e ) { - // then: expected + // expected } + + // then + verify( topologyService, atLeast( 2 ) ).coreServers(); } @Test - public void shouldConsiderTopologyChanges() throws Throwable + public void shouldBindToClusterIdPublishedByAnotherMember() throws Throwable { // given - ClusterId commonClusterId = new ClusterId( UUID.randomUUID() ); + ClusterId publishedClusterId = new ClusterId( UUID.randomUUID() ); + CoreTopology unboundTopology = new CoreTopology( null, false, emptyMap() ); + CoreTopology boundTopology = new CoreTopology( publishedClusterId, false, emptyMap() ); - CoreTopology topologyNOK = new CoreTopology( null, false, emptyMap() ); - CoreTopology topologyOK = new CoreTopology( commonClusterId, false, emptyMap() ); - - when( topologyService.coreServers() ).thenReturn( topologyNOK, topologyNOK, topologyNOK, topologyOK ); - when( topologyService.casClusterId( any() ) ).thenReturn( true ); + CoreTopologyService topologyService = mock( CoreTopologyService.class ); + when( topologyService.coreServers() ).thenReturn( unboundTopology ).thenReturn( boundTopology ); - BindingService bindingService = new BindingService( clusterIdStorage, topologyService, - getInstance(), Clocks.systemClock(), () -> sleep( 1 ), 30_000 ); + BindingService binder = new BindingService( new StubClusterIdStorage(), topologyService, + NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000, + coreBootstrapper ); // when - bindingService.start(); + ClusterId boundClusterId = binder.bindToCluster( null ); // then - assertEquals( commonClusterId, bindingService.clusterId() ); - verify( topologyService ).casClusterId( any() ); - verify( clusterIdStorage ).writeState( bindingService.clusterId() ); + assertEquals( publishedClusterId, boundClusterId ); + verify( topologyService, atLeast( 2 ) ).coreServers(); } @Test - public void shouldPublishNewId() throws Throwable + public void shouldPublishStoredClusterIdIfPreviouslyBound() throws Throwable { // given - CoreTopology topology = new CoreTopology( null, true, emptyMap() ); + ClusterId previouslyBoundClusterId = new ClusterId( UUID.randomUUID() ); - when( topologyService.coreServers() ).thenReturn( topology ); - when( topologyService.casClusterId( any() ) ).thenReturn( true ); + CoreTopologyService topologyService = mock( CoreTopologyService.class ); + when( topologyService.casClusterId( previouslyBoundClusterId ) ).thenReturn( true ); + + StubClusterIdStorage clusterIdStorage = new StubClusterIdStorage(); + clusterIdStorage.writeState( previouslyBoundClusterId ); - BindingService bindingService = new BindingService( clusterIdStorage, topologyService, - getInstance(), Clocks.systemClock(), () -> sleep( 1 ), 30_000 ); + BindingService binder = new BindingService( clusterIdStorage, topologyService, + NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000, + coreBootstrapper ); // when - bindingService.start(); + ClusterId boundClusterId = binder.bindToCluster( null ); // then - verify( topologyService ).casClusterId( bindingService.clusterId() ); - verify( clusterIdStorage ).writeState( bindingService.clusterId() ); + verify( topologyService ).casClusterId( previouslyBoundClusterId ); + assertEquals( previouslyBoundClusterId, boundClusterId ); } @Test - public void shouldPublishOldId() throws Throwable + public void shouldFailToPublishMismatchingStoredClusterId() throws Throwable { // given - CoreTopology topology = new CoreTopology( null, true, emptyMap() ); - ClusterId localClusterId = new ClusterId( UUID.randomUUID() ); + ClusterId previouslyBoundClusterId = new ClusterId( UUID.randomUUID() ); + + CoreTopologyService topologyService = mock( CoreTopologyService.class ); + when( topologyService.casClusterId( previouslyBoundClusterId ) ).thenReturn( false ); - when( clusterIdStorage.exists() ).thenReturn( true ); - when( clusterIdStorage.readState() ).thenReturn( localClusterId ); + StubClusterIdStorage clusterIdStorage = new StubClusterIdStorage(); + clusterIdStorage.writeState( previouslyBoundClusterId ); - when( topologyService.coreServers() ).thenReturn( topology ); + BindingService binder = new BindingService( clusterIdStorage, topologyService, + NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000, + coreBootstrapper ); + + // when + try + { + binder.bindToCluster( null ); + fail( "Should have thrown exception" ); + } + catch ( BindingException e ) + { + // expected + } + } + + @Test + public void shouldBootstrapWhenBootstrappable() throws Throwable + { + // given + CoreTopology bootstrappableTopology = new CoreTopology( null, true, emptyMap() ); + + CoreTopologyService topologyService = mock( CoreTopologyService.class ); + when( topologyService.coreServers() ).thenReturn( bootstrappableTopology ); when( topologyService.casClusterId( any() ) ).thenReturn( true ); - BindingService bindingService = new BindingService( clusterIdStorage, topologyService, - getInstance(), Clocks.systemClock(), () -> sleep( 1 ), 30_000 ); + BindingService binder = new BindingService( new StubClusterIdStorage(), topologyService, + NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000, + coreBootstrapper ); + + ThrowingConsumer snapshotInstaller = mock( ThrowingConsumer.class ); // when - bindingService.start(); + ClusterId boundClusterId = binder.bindToCluster( snapshotInstaller ); // then - assertEquals( localClusterId, bindingService.clusterId() ); - verify( topologyService ).casClusterId( localClusterId ); - verify( clusterIdStorage, never() ).writeState( any() ); + verify( coreBootstrapper ).bootstrap( any() ); + verify( topologyService ).casClusterId( boundClusterId ); + verify( snapshotInstaller ).accept( any() ); + } + + private class StubClusterIdStorage implements SimpleStorage + { + private ClusterId clusterId; + + @Override + public boolean exists() + { + return clusterId != null; + } + + @Override + public ClusterId readState() throws IOException + { + return clusterId; + } + + @Override + public void writeState( ClusterId state ) throws IOException + { + clusterId = state; + } } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CoreBootstrapperTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CoreBootstrapperTest.java new file mode 100644 index 0000000000000..1fe9a2baf336f --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CoreBootstrapperTest.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.core.state; + +import java.io.File; +import java.util.Set; + +import org.junit.Rule; +import org.junit.Test; + +import org.neo4j.coreedge.backup.RestoreClusterUtils; +import org.neo4j.coreedge.core.replication.session.GlobalSessionTrackerState; +import org.neo4j.coreedge.core.state.machines.id.IdAllocationState; +import org.neo4j.coreedge.core.state.machines.locks.ReplicatedLockTokenState; +import org.neo4j.coreedge.core.state.machines.tx.LastCommittedIndexFinder; +import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; +import org.neo4j.coreedge.core.state.snapshot.CoreStateType; +import org.neo4j.coreedge.core.state.snapshot.RaftCoreState; +import org.neo4j.coreedge.identity.MemberId; +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; +import org.neo4j.kernel.impl.store.id.IdType; +import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionIdStore; +import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionStore; +import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.logging.NullLogProvider; +import org.neo4j.test.rule.PageCacheRule; +import org.neo4j.test.rule.TestDirectory; + +import static java.util.UUID.randomUUID; + +import static org.junit.Assert.assertEquals; + +import static org.neo4j.helpers.collection.Iterators.asSet; + +public class CoreBootstrapperTest +{ + @Rule + public TestDirectory testDirectory = TestDirectory.testDirectory(); + + @Rule + public PageCacheRule pageCacheRule = new PageCacheRule(); + + private DefaultFileSystemAbstraction fsa = new DefaultFileSystemAbstraction(); + + @Test + public void shouldSetAllCoreState() throws Exception + { + // given + int nodeCount = 100; + File classicNeo4jStore = RestoreClusterUtils.createClassicNeo4jStore( + testDirectory.directory(), fsa, nodeCount, StandardV3_0.NAME ); + + PageCache pageCache = pageCacheRule.getPageCache( fsa ); + CoreBootstrapper bootstrapper = new CoreBootstrapper( + classicNeo4jStore, pageCache, fsa, Config.defaults() ); + + // when + Set membership = asSet( randomMember(), randomMember(), randomMember() ); + CoreSnapshot snapshot = bootstrapper.bootstrap( membership ); + + // then + assertEquals( nodeCount, ((IdAllocationState) snapshot.get( CoreStateType.ID_ALLOCATION )) + .firstUnallocated( IdType.NODE ) ); + + /* Bootstrapped state is created in RAFT land at index -1 and term -1. */ + assertEquals( 0, snapshot.prevIndex() ); + assertEquals( 0, snapshot.prevTerm() ); + + /* Lock is initially not taken. */ + assertEquals( new ReplicatedLockTokenState(), snapshot.get( CoreStateType.LOCK_TOKEN ) ); + + /* Raft has the bootstrapped set of members initially. */ + assertEquals( membership, ((RaftCoreState) snapshot.get( CoreStateType.RAFT_CORE_STATE )).committed().members() ); + + /* The session state is initially empty. */ + assertEquals( new GlobalSessionTrackerState(), snapshot.get( CoreStateType.SESSION_TRACKER ) ); + + LastCommittedIndexFinder lastCommittedIndexFinder = new LastCommittedIndexFinder( new ReadOnlyTransactionIdStore( pageCache, classicNeo4jStore ), new ReadOnlyTransactionStore( + pageCache, fsa, classicNeo4jStore, new Monitors() ), NullLogProvider.getInstance() ); + + long lastCommittedIndex = lastCommittedIndexFinder.getLastCommittedIndex(); + assertEquals( -1, lastCommittedIndex ); + } + + private MemberId randomMember() + { + return new MemberId( randomUUID() ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/DumpClusterStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/DumpClusterStateTest.java index 1ee9f2dede2c9..0cd3a6dcf53b7 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/DumpClusterStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/DumpClusterStateTest.java @@ -35,6 +35,7 @@ import org.neo4j.coreedge.core.state.machines.id.IdAllocationState; import org.neo4j.coreedge.core.state.machines.locks.ReplicatedLockTokenState; import org.neo4j.coreedge.core.state.storage.DurableStateStorage; +import org.neo4j.coreedge.core.state.storage.SimpleFileStorage; import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.core.state.storage.StateMarshal; import org.neo4j.coreedge.identity.MemberId; @@ -43,7 +44,7 @@ import org.neo4j.test.rule.fs.EphemeralFileSystemRule; import static org.junit.Assert.assertEquals; -import static org.neo4j.coreedge.ReplicationModule.LAST_FLUSHED_NAME; +import static org.neo4j.coreedge.core.server.CoreServerModule.LAST_FLUSHED_NAME; import static org.neo4j.coreedge.ReplicationModule.SESSION_TRACKER_NAME; import static org.neo4j.coreedge.core.IdentityModule.CORE_MEMBER_ID_NAME; import static org.neo4j.coreedge.core.consensus.ConsensusModule.RAFT_MEMBERSHIP_NAME; @@ -76,7 +77,7 @@ public void shouldDumpClusterState() throws Exception private void createStates() throws IOException { - SimpleStorage memberIdStorage = new SimpleStorage<>( fsa.get(), clusterStateDirectory, CORE_MEMBER_ID_NAME, new MemberId.Marshal(), NullLogProvider.getInstance() ); + SimpleStorage memberIdStorage = new SimpleFileStorage<>( fsa.get(), clusterStateDirectory, CORE_MEMBER_ID_NAME, new MemberId.Marshal(), NullLogProvider.getInstance() ); memberIdStorage.writeState( new MemberId( UUID.randomUUID() ) ); createDurableState( LAST_FLUSHED_NAME, new LongIndexMarshal() ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/storage/SimpleStorageTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/storage/SimpleStorageTest.java index 84a11d5d16db8..1b077040c2a90 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/storage/SimpleStorageTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/storage/SimpleStorageTest.java @@ -40,7 +40,7 @@ public class SimpleStorageTest public void shouldWriteAndReadState() throws Exception { // given - SimpleStorage storage = new SimpleStorage<>( fsa.get(), new File( "state-dir" ), "member-id-a", new MemberId.Marshal(), NullLogProvider.getInstance() ); + SimpleStorage storage = new SimpleFileStorage<>( fsa.get(), new File( "state-dir" ), "member-id-a", new MemberId.Marshal(), NullLogProvider.getInstance() ); // when MemberId idA = new MemberId( UUID.randomUUID() ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java index f5c07cb8005b6..48fef7a073726 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java @@ -67,7 +67,7 @@ public class Cluster { private static final int DEFAULT_TIMEOUT_MS = 15_000; private static final int DEFAULT_BACKOFF_MS = 100; - public final int defaultClusterSize = 3; + private static final int DEFAULT_CLUSTER_SIZE = 3; private final File parentDir; private final DiscoveryServiceFactory discoveryServiceFactory; @@ -142,7 +142,7 @@ public CoreClusterMember addCoreMemberWithId( int memberId ) public CoreClusterMember addCoreMemberWithIdAndInitialMembers( int memberId, List initialMembers ) { - CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, defaultClusterSize, initialMembers, + CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, DEFAULT_CLUSTER_SIZE, initialMembers, discoveryServiceFactory, StandardV3_0.NAME, parentDir, emptyMap(), emptyMap() ); coreMembers.put( memberId, coreClusterMember ); @@ -309,7 +309,7 @@ public CoreClusterMember coreTx( BiConsumer op ) private CoreClusterMember addCoreMemberWithId( int memberId, Map extraParams, Map> instanceExtraParams, String recordFormat ) { List advertisedAddress = buildAddresses( coreMembers.keySet() ); - CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, defaultClusterSize, advertisedAddress, + CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, DEFAULT_CLUSTER_SIZE, advertisedAddress, discoveryServiceFactory, recordFormat, parentDir, extraParams, instanceExtraParams ); coreMembers.put( memberId, coreClusterMember ); @@ -396,7 +396,6 @@ private void createCoreMembers( final int noOfCoreMembers, List addresses, Map extraParams, Map> instanceExtraParams, String recordFormat ) { - for ( int i = 0; i < noOfCoreMembers; i++ ) { CoreClusterMember coreClusterMember = new CoreClusterMember( i, noOfCoreMembers, addresses, @@ -465,11 +464,18 @@ private void shutdownEdgeMembers() public static void dataMatchesEventually( CoreClusterMember member, Collection targetDBs ) throws TimeoutException, InterruptedException { - CoreGraphDatabase sourceDB = member.database(); - DbRepresentation sourceRepresentation = DbRepresentation.of( sourceDB ); + dataMatchesEventually( DbRepresentation.of( member.database() ), targetDBs ); + } + + public static void dataMatchesEventually( DbRepresentation sourceRepresentation, Collection targetDBs ) + throws TimeoutException, InterruptedException + { for ( CoreClusterMember targetDB : targetDBs ) { - Predicates.await( () -> sourceRepresentation.equals( DbRepresentation.of( targetDB.database() ) ), + Predicates.await( () -> { + DbRepresentation representation = DbRepresentation.of( targetDB.database() ); + return sourceRepresentation.equals( representation ); + }, DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java index 9d7c6f747223d..32ecccdc0436f 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java @@ -153,4 +153,9 @@ public String toString() { return format( "CoreClusterMember{serverId=%d}", serverId ); } + + public int serverId() + { + return serverId; + } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java index 659bdc3c92b15..608be6ceafffd 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java @@ -35,7 +35,6 @@ import org.neo4j.coreedge.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.logging.LogProvider; import static java.util.Collections.unmodifiableMap; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/edge/EdgeStartupProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/edge/EdgeStartupProcessTest.java index 48080634e92df..69dae4991e33c 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/edge/EdgeStartupProcessTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/edge/EdgeStartupProcessTest.java @@ -87,9 +87,8 @@ txPulling, new AlwaysChooseFirstMember( hazelcastTopology ), edgeStartupProcess.start(); // then - verify( localDatabase ).stop(); verify( storeFetcher ).copyStore( any(), any(), any() ); - verify( localDatabase, times( 2 ) ).start(); // once for initial start, once for after store copy + verify( localDatabase ).start(); verify( txPulling ).start(); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java index e52ded2ef9990..8351834262f70 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java @@ -121,8 +121,8 @@ public void whenWeRestartTheClusterAllServersShouldStillHaveTheSameStoreId() thr assertAllStoresHaveTheSameStoreId( coreStoreDirs, fs ); } - // TODO: Fix this test by having the bootstrapper augment his store and bind it using store-id on disk. - @Ignore + @Test + @Ignore( "Fix this test by having the bootstrapper augment his store and bind it using store-id on disk." ) public void shouldNotJoinClusterIfHasDataWithDifferentStoreId() throws Exception { // GIVEN diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterMembershipChangeIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterMembershipChangeIT.java index 739d78cf779a2..cb3993c2a7bdd 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterMembershipChangeIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterMembershipChangeIT.java @@ -21,6 +21,7 @@ import org.junit.Ignore; import org.junit.Rule; +import org.junit.Test; import java.util.List; @@ -50,6 +51,7 @@ public class ClusterMembershipChangeIT public final ClusterRule clusterRule = new ClusterRule( getClass() ).withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() ) .withNumberOfCoreMembers( 3 ); + @Test @Ignore( "Incomplete, HC will hang waiting for others to join." ) public void newMemberNotInInitialMembersConfig() throws Throwable { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ConvertNonCoreEdgeStoreIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ConvertNonCoreEdgeStoreIT.java index 8e9407e02e140..8061191f8ab8f 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ConvertNonCoreEdgeStoreIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ConvertNonCoreEdgeStoreIT.java @@ -19,28 +19,24 @@ */ package org.neo4j.coreedge.scenarios; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.PrintStream; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Arrays; import java.util.Collection; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.neo4j.coreedge.backup.RestoreClusterCliTest; -import org.neo4j.coreedge.backup.RestoreExistingClusterCli; -import org.neo4j.coreedge.backup.RestoreNewClusterCli; import org.neo4j.coreedge.core.CoreGraphDatabase; import org.neo4j.coreedge.discovery.Cluster; import org.neo4j.coreedge.discovery.CoreClusterMember; import org.neo4j.function.ThrowingSupplier; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Transaction; +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.io.fs.FileUtils; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.store.format.highlimit.HighLimit; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; @@ -51,8 +47,6 @@ import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; -import static org.neo4j.coreedge.backup.ArgsBuilder.args; -import static org.neo4j.coreedge.backup.ArgsBuilder.toArray; import static org.neo4j.coreedge.backup.RestoreClusterUtils.createClassicNeo4jStore; import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_advertised_address; import static org.neo4j.graphdb.Label.label; @@ -85,26 +79,14 @@ public void shouldReplicateTransactionToCoreMembers() throws Throwable // given File dbDir = clusterRule.testDirectory().cleanDirectory( "classic-db" ); int classicNodeCount = 1024; - File classicNeo4jStore = createClassicNeo4jStore( dbDir, classicNodeCount, recordFormat ); + File classicNeo4jStore = createClassicNeo4jStore( dbDir, new DefaultFileSystemAbstraction(), classicNodeCount, recordFormat ); Cluster cluster = this.clusterRule.withRecordFormat( recordFormat ).createCluster(); - Path homeDir = Paths.get(cluster.getCoreMemberById( 0 ).homeDir().getPath()); - - ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrintStream sysOut = new PrintStream( output ); - - new RestoreNewClusterCli( homeDir, homeDir, sysOut ).execute( - toArray( args().from( classicNeo4jStore ).database( "graph.db" ).force().build() ) ); - - String seed = RestoreClusterCliTest.extractSeed( output.toString() ); - - for ( int serverId = 1; serverId < CLUSTER_SIZE; serverId++ ) + for ( int serverId = 0; serverId < CLUSTER_SIZE; serverId++ ) { - Path destination = Paths.get(cluster.getCoreMemberById( serverId ).homeDir().getPath()); - - new RestoreExistingClusterCli( destination, destination ).execute( - toArray( args().from( classicNeo4jStore ).database( "graph.db" ).seed( seed ).force().build() ) ); + File destination = cluster.getCoreMemberById( serverId ).storeDir(); + FileUtils.copyRecursively( classicNeo4jStore, destination ); } cluster.start(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java b/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java index 2c0a26b2c8afb..06d64ec1a83ca 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java @@ -113,7 +113,6 @@ public Cluster createCluster() throws Exception { cluster = new Cluster( clusterDirectory, noCoreMembers, noEdgeMembers, discoveryServiceFactory, coreParams, instanceCoreParams, edgeParams, instanceEdgeParams, recordFormat ); - } return cluster;