Skip to content

Commit

Permalink
DatasourceManager lifecycle is now exclusively controlled by LocalDat…
Browse files Browse the repository at this point in the history
…abase

DatasourceManager lifecycle was controlled in part by the top level LifeSupport
 and in part by CoreState which, through LocalDatabase, started and stopped
 DSM to download snapshots. Now DSM is controlled only through LD, allowing
 for simpler lifecycle dependencies and future improvements in caching StoreId,
 blocking message processing etc.
  • Loading branch information
digitalstain committed Aug 3, 2016
1 parent 8fabad8 commit 436737a
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 32 deletions.
Expand Up @@ -79,7 +79,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
LocalSessionPool sessionPool = new LocalSessionPool( myGlobalSession );
progressTracker = new ProgressTrackerImpl( myGlobalSession );

replicator = new RaftReplicator( consensusModule.raftInstance(), myself,
replicator = new RaftReplicator( consensusModule.raftMachine(), myself,
loggingOutbound,
sessionPool, progressTracker,
new ExponentialBackoffStrategy( 10, SECONDS ) );
Expand Down
Expand Up @@ -28,12 +28,13 @@
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

public class LocalDatabase implements Supplier<StoreId>
public class LocalDatabase implements Supplier<StoreId>, Lifecycle
{
private final File storeDir;

Expand All @@ -60,15 +61,26 @@ public LocalDatabase( File storeDir, CopiedStoreRecovery copiedStoreRecovery, St
log = logProvider.getLog( getClass() );
}

public void start() throws IOException
public void init() throws Throwable
{
dataSourceManager.getDataSource().start();
dataSourceManager.init();
}

public void stop()
public void start() throws Throwable
{
dataSourceManager.start();
}

public void stop() throws Throwable
{
clearCache();
dataSourceManager.getDataSource().stop();
dataSourceManager.stop();
}

@Override
public void shutdown() throws Throwable
{
dataSourceManager.shutdown();
}

public StoreId storeId()
Expand Down
Expand Up @@ -19,22 +19,22 @@
*/
package org.neo4j.coreedge.core;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.core.state.machines.id.ReplicatedIdGeneratorFactory;
import org.neo4j.coreedge.core.consensus.membership.MembershipWaiterLifecycle;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;

class CoreStartupProcess
{
static LifeSupport createLifeSupport( DataSourceManager dataSourceManager,
static LifeSupport createLifeSupport( LocalDatabase localDatabase,
ReplicatedIdGeneratorFactory idGeneratorFactory,
Lifecycle raftTimeoutService,
Lifecycle coreServerStartupLifecycle,
MembershipWaiterLifecycle membershipWaiterLifecycle )
{
LifeSupport services = new LifeSupport();
services.add( dataSourceManager );
services.add( localDatabase );
services.add( idGeneratorFactory );
services.add( coreServerStartupLifecycle );
services.add( raftTimeoutService );
Expand Down
Expand Up @@ -118,8 +118,8 @@ public void registerProcedures( Procedures procedures )
try
{
procedures.register( new DiscoverMembersProcedure( discoveryService, logProvider ) );
procedures.register( new AcquireEndpointsProcedure( discoveryService, consensusModule.raftInstance(), logProvider ) );
procedures.register( new ClusterOverviewProcedure( discoveryService, consensusModule.raftInstance(), logProvider ) );
procedures.register( new AcquireEndpointsProcedure( discoveryService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new ClusterOverviewProcedure( discoveryService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new RoleProcedure( CORE ) );
}
catch ( ProcedureException e )
Expand Down Expand Up @@ -170,7 +170,8 @@ fileSystem, clusterStateDirectory, CORE_MEMBER_ID_NAME, new MemberIdMarshal(), 1
int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size );

final SenderService senderService =
new SenderService( new RaftChannelInitializer( new CoreReplicatedContentMarshal(), logProvider ), logProvider, platformModule.monitors,
new SenderService( new RaftChannelInitializer( new CoreReplicatedContentMarshal(), logProvider ),
logProvider, platformModule.monitors,
maxQueueSize, new NonBlockingChannels() );
life.add( senderService );

Expand Down Expand Up @@ -202,14 +203,14 @@ fileSystem, clusterStateDirectory, CORE_MEMBER_ID_NAME, new MemberIdMarshal(), 1
consensusModule =
new ConsensusModule( myself, platformModule, raftOutbound, clusterStateDirectory, discoveryService );

dependencies.satisfyDependency( consensusModule.raftInstance() );
dependencies.satisfyDependency( consensusModule.raftMachine() );

ReplicationModule replicationModule = new ReplicationModule( myself, platformModule, config, consensusModule,
loggingOutbound, clusterStateDirectory,
fileSystem, databaseHealthSupplier, logProvider );

coreStateMachinesModule = new CoreStateMachinesModule( myself, platformModule, clusterStateDirectory,
databaseHealthSupplier, config, replicationModule.getReplicator(), consensusModule.raftInstance(),
databaseHealthSupplier, config, replicationModule.getReplicator(), consensusModule.raftMachine(),
dependencies, localDatabase );

this.idGeneratorFactory = coreStateMachinesModule.idGeneratorFactory;
Expand All @@ -221,14 +222,16 @@ fileSystem, clusterStateDirectory, CORE_MEMBER_ID_NAME, new MemberIdMarshal(), 1
this.commitProcessFactory = coreStateMachinesModule.commitProcessFactory;

CoreServerModule coreServerModule = new CoreServerModule( myself, platformModule, consensusModule,
coreStateMachinesModule, replicationModule, clusterStateDirectory, discoveryService, localDatabase, messageLogger );
coreStateMachinesModule, replicationModule, clusterStateDirectory, discoveryService, localDatabase,
messageLogger );

editionInvariants( platformModule, dependencies, config, logging, life );

this.lockManager = dependencies.satisfyDependency( lockManager );

life.add( CoreStartupProcess.createLifeSupport(
platformModule.dataSourceManager, coreStateMachinesModule.replicatedIdGeneratorFactory, coreServerModule.startupLifecycle, consensusModule.raftTimeoutService(), coreServerModule.membershipWaiterLifecycle ) );
localDatabase, coreStateMachinesModule.replicatedIdGeneratorFactory, coreServerModule.startupLifecycle,
consensusModule.raftTimeoutService(), coreServerModule.membershipWaiterLifecycle ) );
}

private void editionInvariants( PlatformModule platformModule, Dependencies dependencies, Config config,
Expand Down Expand Up @@ -262,7 +265,7 @@ private void editionInvariants( PlatformModule platformModule, Dependencies depe

public boolean isLeader()
{
return consensusModule.raftInstance().currentRole() == Role.LEADER;
return consensusModule.raftMachine().currentRole() == Role.LEADER;
}

private File createClusterStateDirectory( File dir, FileSystemAbstraction fileSystem )
Expand Down
Expand Up @@ -240,7 +240,7 @@ public RaftLog raftLog()
return raftLog;
}

public RaftMachine raftInstance()
public RaftMachine raftMachine()
{
return raftMachine;
}
Expand Down
Expand Up @@ -78,8 +78,10 @@ public class CoreServerModule
public final LifeSupport startupLifecycle;
public final MembershipWaiterLifecycle membershipWaiterLifecycle;

public CoreServerModule( MemberId myself, final PlatformModule platformModule, ConsensusModule consensusModule, CoreStateMachinesModule coreStateMachinesModule, ReplicationModule replicationModule, File clusterStateDirectory, CoreTopologyService
discoveryService, LocalDatabase localDatabase, MessageLogger<MemberId> messageLogger )
public CoreServerModule( MemberId myself, final PlatformModule platformModule, ConsensusModule consensusModule,
CoreStateMachinesModule coreStateMachinesModule, ReplicationModule replicationModule,
File clusterStateDirectory, CoreTopologyService discoveryService,
LocalDatabase localDatabase, MessageLogger<MemberId> messageLogger )
{
final Dependencies dependencies = platformModule.dependencies;
final Config config = platformModule.config;
Expand Down Expand Up @@ -138,11 +140,14 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C
NotMyselfSelectionStrategy someoneElse = new NotMyselfSelectionStrategy( discoveryService, myself );

CoreState coreState = new CoreState(
consensusModule.raftInstance(), localDatabase,
consensusModule.raftMachine(), localDatabase,
logProvider,
someoneElse, downloader,
new CommandApplicationProcess( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(), config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ),
config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier, logProvider, replicationModule.getProgressTracker(), lastFlushedStorage, replicationModule.getSessionTracker(), coreStateApplier,
new CommandApplicationProcess( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(),
config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ),
config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ),
databaseHealthSupplier, logProvider, replicationModule.getProgressTracker(),
lastFlushedStorage, replicationModule.getSessionTracker(), coreStateApplier,
inFlightMap, platformModule.monitors ) );

dependencies.satisfyDependency( coreState );
Expand All @@ -162,7 +167,7 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C
new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, coreState, logProvider );
long joinCatchupTimeout = config.get( CoreEdgeClusterSettings.join_catch_up_timeout );
membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter,
joinCatchupTimeout, consensusModule.raftInstance(), logProvider );
joinCatchupTimeout, consensusModule.raftMachine(), logProvider );

life.add( new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ),
batchingMessageHandler ) );
Expand Down
Expand Up @@ -19,9 +19,7 @@
*/
package org.neo4j.coreedge.core.state.snapshot;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import org.neo4j.coreedge.catchup.CoreClient;
Expand All @@ -42,20 +40,21 @@ public class CoreStateDownloader
private final CoreClient coreClient;
private final Log log;

public CoreStateDownloader( LocalDatabase localDatabase, StoreFetcher storeFetcher, CoreClient coreClient, LogProvider logProvider )
public CoreStateDownloader( LocalDatabase localDatabase, StoreFetcher storeFetcher, CoreClient coreClient,
LogProvider logProvider )
{
this.localDatabase = localDatabase;
this.storeFetcher = storeFetcher;
this.coreClient = coreClient;
this.log = logProvider.getLog( getClass() );
}

public synchronized void downloadSnapshot( MemberId source, CoreState coreState ) throws InterruptedException, StoreCopyFailedException
public synchronized void downloadSnapshot( MemberId source, CoreState coreState )
throws InterruptedException, StoreCopyFailedException
{
localDatabase.stop();

try
{
localDatabase.stop();
log.info( "Downloading snapshot from core server at %s", source );

/* The core snapshot must be copied before the store, because the store has a dependency on
Expand Down Expand Up @@ -91,7 +90,7 @@ public synchronized void downloadSnapshot( MemberId source, CoreState coreState
localDatabase.start();
log.info( "Restarted local database", source );
}
catch ( IOException | ExecutionException e )
catch ( Throwable e )
{
localDatabase.panic( e );
throw new StoreCopyFailedException( e );
Expand Down
Expand Up @@ -19,12 +19,17 @@
*/
package org.neo4j.coreedge.core;

import java.io.File;
import java.util.List;
import java.util.function.Supplier;

import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;

import org.neo4j.coreedge.catchup.storecopy.CopiedStoreRecovery;
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.StoreFiles;
import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.core.consensus.RaftServer;
import org.neo4j.coreedge.core.state.machines.id.ReplicatedIdGeneratorFactory;
Expand All @@ -33,6 +38,7 @@
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.LogProvider;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
Expand All @@ -45,13 +51,16 @@ public class CoreStartupProcessTest
public void raftTimeOutServiceTriggersMessagesSentToAnotherServer() throws Exception
{
DataSourceManager dataSourceManager = mock( DataSourceManager.class );
LocalDatabase localDatabase = new LocalDatabase( new File(""), mock( CopiedStoreRecovery.class ),
mock( StoreFiles.class ), dataSourceManager, mock( Supplier.class ), mock( Supplier.class ),
mock( LogProvider.class ) );
ReplicatedIdGeneratorFactory idGeneratorFactory = mock( ReplicatedIdGeneratorFactory.class );
RaftServer raftServer = mock( RaftServer.class );
LifeSupport coreServer = mock( LifeSupport.class );
DelayedRenewableTimeoutService raftTimeoutService = mock( DelayedRenewableTimeoutService.class );
MembershipWaiterLifecycle membershipWaiter = mock( MembershipWaiterLifecycle.class );

LifeSupport lifeSupport = CoreStartupProcess.createLifeSupport( dataSourceManager,
LifeSupport lifeSupport = CoreStartupProcess.createLifeSupport( localDatabase,
idGeneratorFactory, raftTimeoutService, coreServer, membershipWaiter );

assertThat( lifeSupport, startsComponent( raftTimeoutService ).after( raftServer )
Expand Down

0 comments on commit 436737a

Please sign in to comment.