Skip to content

Commit

Permalink
Removed the supplier-of-supplier latent bug reported by DG.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimwebber committed Dec 23, 2015
1 parent 0b0e481 commit 7bef578
Showing 1 changed file with 19 additions and 14 deletions.
Expand Up @@ -164,7 +164,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
ListenSocketAddress raftListenAddress = config.get( CoreEdgeClusterSettings.raft_listen_address ); ListenSocketAddress raftListenAddress = config.get( CoreEdgeClusterSettings.raft_listen_address );
RaftServer<CoreMember> raftServer = new RaftServer<>( marshall, raftListenAddress, logProvider ); RaftServer<CoreMember> raftServer = new RaftServer<>( marshall, raftListenAddress, logProvider );


final DelayedRenewableTimeoutService raftTimeoutService = new DelayedRenewableTimeoutService( Clock.SYSTEM_CLOCK, logProvider ); final DelayedRenewableTimeoutService raftTimeoutService = new DelayedRenewableTimeoutService( Clock
.SYSTEM_CLOCK, logProvider );


File raftLogsDirectory = createRaftLogsDirectory( platformModule.storeDir, fileSystem ); File raftLogsDirectory = createRaftLogsDirectory( platformModule.storeDir, fileSystem );
NaiveDurableRaftLog raftLog = new NaiveDurableRaftLog( fileSystem, raftLogsDirectory, NaiveDurableRaftLog raftLog = new NaiveDurableRaftLog( fileSystem, raftLogsDirectory,
Expand All @@ -177,8 +178,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
life.add( termStore ); life.add( termStore );
life.add( voteStore ); life.add( voteStore );


Supplier<DatabaseHealth> databaseHealthSupplier = (Supplier) () -> dependencies.provideDependency( DatabaseHealth.class ); Supplier<DatabaseHealth> databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class );



RaftStorageExceptionHandler raftStorageExceptionHandler = RaftStorageExceptionHandler raftStorageExceptionHandler =
new RaftStorageExceptionHandler( databaseHealthSupplier ); new RaftStorageExceptionHandler( databaseHealthSupplier );
Expand All @@ -193,11 +193,13 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,


LocalSessionPool localSessionPool = new LocalSessionPool( myself ); LocalSessionPool localSessionPool = new LocalSessionPool( myself );


ReplicatedLockStateMachine replicatedLockStateMachine = new ReplicatedLockStateMachine( myself, replicator ); ReplicatedLockStateMachine<CoreMember> replicatedLockStateMachine = new ReplicatedLockStateMachine<>( myself, replicator );


commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool, replicatedLockStateMachine, dependencies, SYSTEM_CLOCK ); commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool, replicatedLockStateMachine,
dependencies, SYSTEM_CLOCK );


ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( myself, new InMemoryIdAllocationStateStore() ); ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( myself,
new InMemoryIdAllocationStateStore() );
replicator.subscribe( idAllocationStateMachine ); replicator.subscribe( idAllocationStateMachine );


// TODO: AllocationChunk should be configurable and per type. The retry timeout should also be configurable. // TODO: AllocationChunk should be configurable and per type. The retry timeout should also be configurable.
Expand All @@ -208,7 +210,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,


long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout );
MembershipWaiter<CoreMember> membershipWaiter = MembershipWaiter<CoreMember> membershipWaiter =
new MembershipWaiter<>( myself, platformModule.jobScheduler, electionTimeout*4, logProvider ); new MembershipWaiter<>( myself, platformModule.jobScheduler, electionTimeout * 4, logProvider );


ReplicatedIdGeneratorFactory replicatedIdGeneratorFactory = ReplicatedIdGeneratorFactory replicatedIdGeneratorFactory =
createIdGeneratorFactory( fileSystem, idRangeAcquirer, logProvider ); createIdGeneratorFactory( fileSystem, idRangeAcquirer, logProvider );
Expand Down Expand Up @@ -250,7 +252,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
channelInitializer ) ); channelInitializer ) );
channelInitializer.setOwner( coreToCoreClient ); channelInitializer.setOwner( coreToCoreClient );


lockManager = dependencies.satisfyDependency( createLockManager( config, logging, replicator, myself, replicatedLockStateMachine ) ); lockManager = dependencies.satisfyDependency( createLockManager( config, logging, replicator, myself,
replicatedLockStateMachine ) );


CatchupServer catchupServer = new CatchupServer( logProvider, CatchupServer catchupServer = new CatchupServer( logProvider,
new StoreIdSupplier( platformModule ), new StoreIdSupplier( platformModule ),
Expand All @@ -261,13 +264,14 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
config.get( CoreEdgeClusterSettings.transaction_listen_address ) ); config.get( CoreEdgeClusterSettings.transaction_listen_address ) );


life.add( CoreServerStartupProcess.createLifeSupport( life.add( CoreServerStartupProcess.createLifeSupport(
platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft, new RaftLogReplay( raftLog, logProvider ), raftServer, platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft, new RaftLogReplay( raftLog,
logProvider ), raftServer,
catchupServer, raftTimeoutService, membershipWaiter, catchupServer, raftTimeoutService, membershipWaiter,
config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), config.get( CoreEdgeClusterSettings.join_catch_up_timeout ),
new RecoverTransactionLogState( dependencies, logProvider, new RecoverTransactionLogState( dependencies, logProvider,
relationshipTypeTokenHolder, propertyKeyTokenHolder, labelTokenHolder ), relationshipTypeTokenHolder, propertyKeyTokenHolder, labelTokenHolder ),
tokenLife tokenLife
)); ) );
} }


public boolean isLeader() public boolean isLeader()
Expand All @@ -293,7 +297,8 @@ private File createRaftLogsDirectory( File dir, FileSystemAbstraction fileSystem


public static CommitProcessFactory createCommitProcessFactory( final Replicator replicator, public static CommitProcessFactory createCommitProcessFactory( final Replicator replicator,
final LocalSessionPool localSessionPool, final LocalSessionPool localSessionPool,
CurrentReplicatedLockState currentReplicatedLockState, CurrentReplicatedLockState
currentReplicatedLockState,
final Dependencies dependencies, final Dependencies dependencies,
final Clock clock ) final Clock clock )
{ {
Expand Down Expand Up @@ -323,7 +328,7 @@ private static RaftInstance<CoreMember> createRaft( LifeSupport life,
MessageLogger<AdvertisedSocketAddress> messageLogger, MessageLogger<AdvertisedSocketAddress> messageLogger,
RaftLog raftLog, RaftLog raftLog,
TermStore termStore, TermStore termStore,
VoteStore voteStore, VoteStore<CoreMember> voteStore,
CoreMember myself, CoreMember myself,
LogProvider logProvider, LogProvider logProvider,
RaftServer<CoreMember> raftServer, RaftServer<CoreMember> raftServer,
Expand Down Expand Up @@ -406,11 +411,11 @@ protected ReplicatedIdGeneratorFactory createIdGeneratorFactory( FileSystemAbstr
} }


protected Locks createLockManager( final Config config, final LogService logging, final Replicator replicator, protected Locks createLockManager( final Config config, final LogService logging, final Replicator replicator,
CoreMember myself, ReplicatedLockStateMachine replicatedLockStateMachine ) CoreMember myself, ReplicatedLockStateMachine<CoreMember> replicatedLockStateMachine )
{ {
Locks local = CommunityEditionModule.createLockManager( config, logging ); Locks local = CommunityEditionModule.createLockManager( config, logging );


return new LeaderOnlyLockManager( myself, replicator, local, replicatedLockStateMachine ); return new LeaderOnlyLockManager<CoreMember>( myself, replicator, local, replicatedLockStateMachine );
} }


protected TransactionHeaderInformationFactory createHeaderInformationFactory() protected TransactionHeaderInformationFactory createHeaderInformationFactory()
Expand Down

0 comments on commit 7bef578

Please sign in to comment.