diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyFailedException.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyFailedException.java index 984696454108..f74301b95755 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyFailedException.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyFailedException.java @@ -25,4 +25,9 @@ public StoreCopyFailedException( Throwable cause ) { super( cause ); } + + public StoreCopyFailedException( String message ) + { + super( message ); + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index 7adf0d97efa8..834cbc7d5ac8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -246,16 +246,11 @@ public ReadableRaftState state() return state; } - public void downloadSnapshot() - { - raftStateMachine.downloadSnapshot(); - } - private void checkForSnapshotNeed( Outcome outcome ) { if( outcome.needsFreshSnapshot() ) { - downloadSnapshot(); + raftStateMachine.notifyNeedFreshSnapshot(); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java index 1c926929af84..c64df9b25d8d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java @@ -33,8 +33,8 @@ default void notifyCommitted( long commitIndex ) {} /** * Download and install a snapshot of state from another member of the cluster. *

- * Called when the consensus system no longer has the log entries required to further update the state machine, - * because they have been deleted through pruning. + * Called when the consensus system no longer has the log entries required to + * further update the state machine, because they have been deleted through pruning. */ - default void downloadSnapshot() {} + default void notifyNeedFreshSnapshot() {} } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java index ccb47808e15e..1efd98a40618 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java @@ -22,33 +22,39 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; import org.neo4j.coreedge.catchup.storecopy.core.RaftStateType; +import org.neo4j.coreedge.discovery.CoreServerSelectionException; +import org.neo4j.coreedge.raft.RaftStateMachine; +import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogCompactedException; import org.neo4j.coreedge.raft.log.RaftLogCursor; -import org.neo4j.coreedge.raft.log.ReadableRaftLog; import org.neo4j.coreedge.raft.replication.DistributedOperation; import org.neo4j.coreedge.raft.replication.ProgressTracker; import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent; +import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; +import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import static java.lang.String.format; +import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.TimeUnit.HOURS; -public class CoreState extends LifecycleAdapter +public class CoreState extends LifecycleAdapter implements RaftStateMachine { private static final long NOTHING = -1; private CoreStateMachines coreStateMachines; - private final ReadableRaftLog raftLog; + private final RaftLog raftLog; private final StateStorage lastFlushedStorage; private final int flushEvery; private final ProgressTracker progressTracker; @@ -59,21 +65,25 @@ public class CoreState extends LifecycleAdapter private final Log log; private long lastApplied = NOTHING; - private ExecutorService executor; - private long lastSeenCommitIndex = NOTHING; private long lastFlushed = NOTHING; + private final CoreServerSelectionStrategy selectionStrategy; + private final CoreStateDownloader downloader; + + private ExecutorService applier; + public CoreState( - ReadableRaftLog raftLog, - ExecutorService executor, + RaftLog raftLog, int flushEvery, Supplier dbHealth, LogProvider logProvider, ProgressTracker progressTracker, StateStorage lastFlushedStorage, StateStorage lastApplyingStorage, - StateStorage> sessionStorage ) + StateStorage> sessionStorage, + CoreServerSelectionStrategy selectionStrategy, + CoreStateDownloader downloader ) { this.raftLog = raftLog; this.lastFlushedStorage = lastFlushedStorage; @@ -81,27 +91,33 @@ public CoreState( this.progressTracker = progressTracker; this.lastApplyingStorage = lastApplyingStorage; this.sessionStorage = sessionStorage; + this.downloader = downloader; + this.selectionStrategy = selectionStrategy; this.log = logProvider.getLog( getClass() ); this.dbHealth = dbHealth; - this.executor = executor; } - public void setStateMachine( CoreStateMachines coreStateMachines, long lastApplied ) + public synchronized void setStateMachine( CoreStateMachines coreStateMachines, long lastApplied ) { this.coreStateMachines = coreStateMachines; this.lastApplied = this.lastFlushed = lastApplied; } + @Override public synchronized void notifyCommitted( long commitIndex ) { if ( this.lastSeenCommitIndex != commitIndex ) { this.lastSeenCommitIndex = commitIndex; - executor.execute( () -> { + applier.submit( () -> { try { applyUpTo( commitIndex ); } + catch( InterruptedException e ) + { + log.warn( "Interrupted while applying", e ); + } catch ( Throwable e ) { log.error( "Failed to apply up to index " + commitIndex, e ); @@ -111,7 +127,52 @@ public synchronized void notifyCommitted( long commitIndex ) } } - private void applyUpTo( long lastToApply ) throws IOException, RaftLogCompactedException + @Override + public synchronized void notifyNeedFreshSnapshot() + { + try + { + downloadSnapshot( selectionStrategy.coreServer() ); + } + catch ( CoreServerSelectionException | InterruptedException | StoreCopyFailedException e ) + { + log.error( "Failed to download snapshot", e ); + } + } + + /** + * Compacts the core state. + * + * @throws IOException + */ + public void compact() throws IOException + { + try + { + raftLog.prune( lastFlushed ); + } + catch ( RaftLogCompactedException e ) + { + log.warn( "Log already pruned?", e ); + } + } + + /** + * Attempts to download a fresh snapshot from another core instance. + * + * @param source The source address to attempt a download of a snapshot from. + */ + public synchronized void downloadSnapshot( AdvertisedSocketAddress source ) throws InterruptedException, StoreCopyFailedException + { + if( !syncExecutor( true, true ) ) + { + throw new StoreCopyFailedException( "Failed to synchronize with executor" ); + } + + downloader.downloadSnapshot( source, this ); + } + + private void applyUpTo( long lastToApply ) throws IOException, RaftLogCompactedException, InterruptedException { try ( RaftLogCursor cursor = raftLog.getEntryCursor( lastApplied + 1 ) ) { @@ -129,6 +190,12 @@ private void applyUpTo( long lastToApply ) throws IOException, RaftLogCompactedE maybeFlush(); } + + if( Thread.interrupted() ) + { + throw new InterruptedException( + format( "Interrupted while applying at lastApplied=%d with lastToApply=%d", lastApplied, lastToApply ) ); + } } } } @@ -151,32 +218,76 @@ private void maybeFlush() throws IOException { if ( lastApplied % this.flushEvery == 0 ) { - coreStateMachines.flush(); - sessionStorage.persistStoreData( sessionState ); - lastFlushedStorage.persistStoreData( lastApplied ); - lastFlushed = lastApplied; + flush(); } } + private void flush() throws IOException + { + coreStateMachines.flush(); + sessionStorage.persistStoreData( sessionState ); + lastFlushedStorage.persistStoreData( lastApplied ); + lastFlushed = lastApplied; + } + + /** + * Used for synchronizing with the internal executor. + * + * @param cancelTasks Tries to cancel pending tasks. + * @param willContinue The executor should continue to accept tasks. + * + * @return Returns true if the executor managed to synchronize with the executor, meaning + * it successfully finished pending tasks and is now idle. Otherwise false. + * + * @throws InterruptedException + */ + boolean syncExecutor( boolean cancelTasks, boolean willContinue ) throws InterruptedException + { + boolean isSuccess = true; + + if( applier != null ) + { + if( cancelTasks ) + { + applier.shutdownNow(); + } + else + { + applier.shutdown(); + } + + if( !applier.awaitTermination( 1, HOURS ) ) + { + log.error( "Applier did not terminate in 1 hour." ); + isSuccess = false; + } + } + + if( willContinue ) + { + applier = newSingleThreadExecutor( new NamedThreadFactory( "core-state-applier" ) ); + } + + return isSuccess; + } + @Override - public synchronized void start() throws IOException, RaftLogCompactedException + public synchronized void start() throws IOException, RaftLogCompactedException, InterruptedException { lastFlushed = lastApplied = lastFlushedStorage.getInitialState(); sessionState = sessionStorage.getInitialState(); + syncExecutor( false, true ); applyUpTo( lastApplyingStorage.getInitialState() ); } @Override - public void stop() throws Throwable - { - executor.shutdown(); - executor.awaitTermination( 1, HOURS ); - } - - public long lastFlushed() + public synchronized void stop() throws Throwable { - return lastFlushed; + if( syncExecutor( true, false ) ) + { + flush(); + } } public synchronized Map snapshot() diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateDownloader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateDownloader.java new file mode 100644 index 000000000000..23eb529c3894 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateDownloader.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.state; + +import java.io.IOException; + +import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; +import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; +import org.neo4j.coreedge.catchup.storecopy.edge.StoreFetcher; +import org.neo4j.coreedge.catchup.storecopy.edge.state.StateFetcher; +import org.neo4j.coreedge.server.AdvertisedSocketAddress; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +public class CoreStateDownloader +{ + private final LocalDatabase localDatabase; + private final StoreFetcher storeFetcher; + private final StateFetcher stateFetcher; + private final Log log; + + public CoreStateDownloader( LocalDatabase localDatabase, StoreFetcher storeFetcher, StateFetcher stateFetcher, LogProvider logProvider ) + { + this.localDatabase = localDatabase; + this.storeFetcher = storeFetcher; + this.stateFetcher = stateFetcher; + this.log = logProvider.getLog( getClass() ); + } + + void downloadSnapshot( AdvertisedSocketAddress source, CoreState receiver ) throws InterruptedException, StoreCopyFailedException + { + localDatabase.stop(); + + try + { + log.info( "Downloading snapshot from core server at %s", source ); + + localDatabase.copyStoreFrom( source, storeFetcher ); + stateFetcher.copyRaftState( source, receiver ); + + localDatabase.start(); + } + catch ( StoreCopyFailedException e ) + { + log.warn( "Failed to download snapshot", e ); + } + catch ( IOException e ) + { + localDatabase.panic( e ); + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreEditionSPI.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreEditionSPI.java index 42bc140d86dd..43eb3bd6e12e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreEditionSPI.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreEditionSPI.java @@ -19,6 +19,9 @@ */ package org.neo4j.coreedge.server.core; +import java.io.IOException; + +import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; import org.neo4j.coreedge.raft.roles.Role; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.CoreMember; @@ -30,7 +33,7 @@ interface CoreEditionSPI extends EditionModule.SPI Role currentRole(); - void downloadSnapshot( AdvertisedSocketAddress source ); + void downloadSnapshot( AdvertisedSocketAddress source ) throws InterruptedException, StoreCopyFailedException; - void compact(); + void compact() throws IOException; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreGraphDatabase.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreGraphDatabase.java index f0ffd8e9c1d5..aa27bdb11732 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreGraphDatabase.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreGraphDatabase.java @@ -20,8 +20,10 @@ package org.neo4j.coreedge.server.core; import java.io.File; +import java.io.IOException; import java.util.Map; +import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; import org.neo4j.coreedge.discovery.DiscoveryServiceFactory; import org.neo4j.coreedge.discovery.HazelcastDiscoveryServiceFactory; import org.neo4j.coreedge.raft.roles.Role; @@ -60,12 +62,12 @@ public Role getRole() return coreEditionSPI.currentRole(); } - public void downloadSnapshot( AdvertisedSocketAddress source ) + public void downloadSnapshot( AdvertisedSocketAddress source ) throws InterruptedException, StoreCopyFailedException { coreEditionSPI.downloadSnapshot( source ); } - public void compact() + public void compact() throws IOException { coreEditionSPI.compact(); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreStateMachine.java deleted file mode 100644 index 7f25a5537215..000000000000 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreStateMachine.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.coreedge.server.core; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; -import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; -import org.neo4j.coreedge.catchup.storecopy.edge.StoreFetcher; -import org.neo4j.coreedge.catchup.storecopy.edge.state.StateFetcher; -import org.neo4j.coreedge.discovery.CoreServerSelectionException; -import org.neo4j.coreedge.raft.RaftStateMachine; -import org.neo4j.coreedge.raft.log.RaftLog; -import org.neo4j.coreedge.raft.log.RaftLogCompactedException; -import org.neo4j.coreedge.raft.replication.tx.ConstantTimeRetryStrategy; -import org.neo4j.coreedge.raft.replication.tx.RetryStrategy; -import org.neo4j.coreedge.raft.state.CoreState; -import org.neo4j.coreedge.server.AdvertisedSocketAddress; -import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; -import org.neo4j.kernel.lifecycle.LifecycleAdapter; -import org.neo4j.logging.Log; -import org.neo4j.logging.LogProvider; - -// TODO: This almost only deals with downloading. Refactor together with CoreState. -public class CoreStateMachine extends LifecycleAdapter implements RaftStateMachine -{ - private final CoreState coreState; - private final LocalDatabase localDatabase; - private final CoreServerSelectionStrategy selectionStrategy; - private final StoreFetcher storeFetcher; - private final StateFetcher stateFetcher; - private final RaftLog raftLog; - private final Log log; - - public CoreStateMachine( - CoreState coreState, - LocalDatabase localDatabase, - CoreServerSelectionStrategy selectionStrategy, - StoreFetcher storeFetcher, - StateFetcher stateFetcher, - LogProvider logProvider, - RaftLog raftLog ) - { - this.coreState = coreState; - this.localDatabase = localDatabase; - this.selectionStrategy = selectionStrategy; - this.storeFetcher = storeFetcher; - this.stateFetcher = stateFetcher; - this.log = logProvider.getLog( getClass() ); - this.raftLog = raftLog; - } - - @Override - public void start() throws Throwable - { - } - - @Override - public void stop() throws Throwable - { - } - - @Override - public void notifyCommitted( long commitIndex ) - { - coreState.notifyCommitted( commitIndex ); - } - - public void compact() - { - try - { - raftLog.prune( coreState.lastFlushed() ); - } - catch ( IOException e ) - { - // TODO panic? - throw new RuntimeException( e ); - } - catch ( RaftLogCompactedException e ) - { - log.warn( "Log already pruned?", e ); - } - } - - @Override - public synchronized void downloadSnapshot() - { - try - { - downloadSnapshot( selectionStrategy.coreServer() ); - } - catch ( CoreServerSelectionException e ) - { - log.error( "Failed to download snapshot", e ); - } - } - - public synchronized void downloadSnapshot( AdvertisedSocketAddress source ) - { - RetryStrategy.Timeout timeout = new ConstantTimeRetryStrategy( 10, TimeUnit.SECONDS ).newTimeout(); - - localDatabase.stop(); - while ( true ) - { - try - { - performDownload( source ); - localDatabase.start(); - break; - } - catch ( CoreServerSelectionException | StoreCopyFailedException ex ) - { - ex.printStackTrace(); - log.info( ex.getMessage() + ", retrying in %d ms.", timeout.getMillis() ); - try - { - Thread.sleep( timeout.getMillis() ); - timeout.increment(); - } - catch ( InterruptedException e ) - { - log.warn( "Snapshot download interrupted" ); - break; - } - } - catch ( IOException e ) - { - localDatabase.panic( e ); - break; - } - } - } - - private void performDownload( AdvertisedSocketAddress source ) throws CoreServerSelectionException, StoreCopyFailedException - { - log.info( "Server starting, connecting to core server at %s", source.toString() ); - - localDatabase.copyStoreFrom( source, storeFetcher ); - stateFetcher.copyRaftState( source, coreState ); - } -} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index eb4302a7f5ec..381b6cb97541 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -25,8 +25,6 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Supplier; import org.neo4j.coreedge.catchup.CatchupServer; @@ -34,6 +32,7 @@ import org.neo4j.coreedge.catchup.DataSourceSupplier; import org.neo4j.coreedge.catchup.StoreIdSupplier; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; +import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; import org.neo4j.coreedge.catchup.storecopy.StoreFiles; import org.neo4j.coreedge.catchup.storecopy.edge.CopiedStoreRecovery; import org.neo4j.coreedge.catchup.storecopy.edge.StoreCopyClient; @@ -66,6 +65,7 @@ import org.neo4j.coreedge.raft.net.RaftChannelInitializer; import org.neo4j.coreedge.raft.net.RaftOutbound; import org.neo4j.coreedge.raft.replication.LeaderOnlyReplicator; +import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl; import org.neo4j.coreedge.raft.replication.RaftReplicator; import org.neo4j.coreedge.raft.replication.Replicator; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationStateMachine; @@ -85,9 +85,9 @@ import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionStateMachine; import org.neo4j.coreedge.raft.roles.Role; import org.neo4j.coreedge.raft.state.CoreState; +import org.neo4j.coreedge.raft.state.CoreStateDownloader; import org.neo4j.coreedge.raft.state.CoreStateMachines; import org.neo4j.coreedge.raft.state.DurableStateStorage; -import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl; import org.neo4j.coreedge.raft.state.LongIndexMarshal; import org.neo4j.coreedge.raft.state.StateStorage; import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState; @@ -163,7 +163,7 @@ public class EnterpriseCoreEditionModule private static final String CLUSTER_STATE_DIRECTORY_NAME = "cluster-state"; private final RaftInstance raft; - private final CoreStateMachine coreStateMachine; + private final CoreState coreState; private final CoreMember myself; @Override @@ -179,15 +179,15 @@ public Role currentRole() } @Override - public void downloadSnapshot( AdvertisedSocketAddress source ) + public void downloadSnapshot( AdvertisedSocketAddress source ) throws InterruptedException, StoreCopyFailedException { - coreStateMachine.downloadSnapshot( source ); + coreState.downloadSnapshot( source ); } @Override - public void compact() + public void compact() throws IOException { - coreStateMachine.compact(); + coreState.compact(); } public enum RaftLogImplementation @@ -272,8 +272,6 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, MonitoredRaftLog raftLog = new MonitoredRaftLog( underlyingLog, platformModule.monitors ); - CoreState coreState; - LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, new CopiedStoreRecovery( config, platformModule.kernelExtensions.listFactories(), platformModule.pageCache ), @@ -326,22 +324,18 @@ fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session throw new RuntimeException( e ); } - ExecutorService applyExecutor = Executors.newSingleThreadExecutor(); + CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, storeFetcher, stateFetcher, logProvider ); coreState = new CoreState( - raftLog, applyExecutor, config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), + raftLog, config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, lastApplyingStorage, - sessionTrackerStorage ); - - coreStateMachine = new CoreStateMachine( coreState, localDatabase, - new NotMyselfSelectionStrategy( discoveryService, myself ), storeFetcher, stateFetcher, - logProvider, raftLog ); + sessionTrackerStorage, new NotMyselfSelectionStrategy( discoveryService, myself ), downloader ); raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, raftLog, - coreStateMachine, fileSystem, clusterStateDirectory, myself, logProvider, raftServer, + coreState, fileSystem, clusterStateDirectory, myself, logProvider, raftServer, raftTimeoutService, databaseHealthSupplier, platformModule.monitors ); - life.add( coreStateMachine ); + life.add( coreState ); } catch ( IOException e ) { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/ElectionPerformanceIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/ElectionPerformanceIT.java index bbb111ba944c..fec4d76954c3 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/ElectionPerformanceIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/ElectionPerformanceIT.java @@ -68,7 +68,7 @@ public void notifyCommitted( long commitIndex ) } @Override - public void downloadSnapshot() + public void notifyNeedFreshSnapshot() { } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java index 749843b657ab..ed14a1b58ea6 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java @@ -21,13 +21,8 @@ import org.junit.Test; -import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.AbstractExecutorService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; @@ -40,6 +35,7 @@ import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent; import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransaction; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.logging.NullLogProvider; @@ -66,8 +62,9 @@ public class CoreStateTest private final GlobalSession globalSession = new GlobalSession<>( UUID.randomUUID(), null ); private final int flushEvery = 10; - private final CoreState coreState = new CoreState( raftLog, new DirectExecutorService(), flushEvery, () -> dbHealth, NullLogProvider.getInstance(), - new ProgressTrackerImpl( globalSession ), lastFlushedStorage, lastApplyingStorage, sessionStorage ); + private final CoreState coreState = new CoreState( raftLog, flushEvery, () -> dbHealth, NullLogProvider.getInstance(), + new ProgressTrackerImpl( globalSession ), lastFlushedStorage, lastApplyingStorage, sessionStorage, + mock( CoreServerSelectionStrategy.class ), mock( CoreStateDownloader.class ) ); private ReplicatedTransaction nullTx = new ReplicatedTransaction( null ); @@ -105,6 +102,7 @@ public void shouldApplyCommittedCommand() throws Exception raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); coreState.notifyCommitted( 2 ); + coreState.syncExecutor( false, false ); // then verify( txStateMachine ).dispatch( nullTx, 0 ); @@ -123,6 +121,7 @@ public void shouldNotApplyUncommittedCommands() throws Exception raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); coreState.notifyCommitted( -1 ); + coreState.syncExecutor( false, false ); // then verify( txStateMachine, times( 0 ) ).dispatch( any( ReplicatedTransaction.class ), anyInt() ); @@ -145,6 +144,7 @@ public void shouldPeriodicallyFlushState() throws Exception // when coreState.notifyCommitted( flushEvery*TIMES ); + coreState.syncExecutor( false, false ); // then verify( txStateMachine, times( TIMES ) ).flush(); @@ -163,46 +163,9 @@ public void shouldPanicIfUnableToApply() throws Exception // when assertEquals( true, dbHealth.isHealthy() ); coreState.notifyCommitted( 0 ); + coreState.syncExecutor( false, false ); // then assertEquals( false, dbHealth.isHealthy() ); } - - private class DirectExecutorService extends AbstractExecutorService - { - @Override - public void shutdown() - { - } - - @Override - public List shutdownNow() - { - return null; - } - - @Override - public boolean isShutdown() - { - return false; - } - - @Override - public boolean isTerminated() - { - return false; - } - - @Override - public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException - { - return false; - } - - @Override - public void execute( Runnable command ) - { - command.run(); - } - } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java index b7d0b64262a2..a968b9da573a 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java @@ -137,7 +137,11 @@ public void shouldBeAbleToDownloadAfterPruning() throws Exception } ); // when - cluster.coreServers().forEach( CoreGraphDatabase::compact ); + for ( CoreGraphDatabase coreDb : cluster.coreServers() ) + { + coreDb.compact(); + } + int newDbId = 3; cluster.addCoreServerWithServerId( newDbId, 4 ); CoreGraphDatabase newDb = cluster.getCoreServerById( 3 );