Skip to content

Commit

Permalink
core-edge: fix up a few generics and some mess in ecem
Browse files Browse the repository at this point in the history
The generics were causing warnings and the instantiation of storage
in EnterpriseCoreEditionModule was consolidated.
  • Loading branch information
martinfurmanski committed Jun 30, 2016
1 parent 62bd4f5 commit 4d6b24d
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 108 deletions.
Expand Up @@ -62,17 +62,16 @@ public class RaftMembershipManager implements RaftMembership, MembershipDriver
private int uncommittedMemberChanges = 0;

private final SendToMyself replicator;
private final RaftGroup.Builder memberSetBuilder;
private final RaftGroup.Builder<CoreMember> memberSetBuilder;
private final ReadableRaftLog entryLog;
private final Log log;
private final int expectedClusterSize;
private final StateStorage<RaftMembershipState> stateStorage;
private final RaftMembershipState raftMembershipState;
private long lastApplied = -1;

public RaftMembershipManager( SendToMyself replicator, RaftGroup.Builder memberSetBuilder, RaftLog entryLog,
LogProvider logProvider, int expectedClusterSize, long electionTimeout,
Clock clock, long catchupTimeout,
public RaftMembershipManager( SendToMyself replicator, RaftGroup.Builder<CoreMember> memberSetBuilder, RaftLog entryLog,
LogProvider logProvider, int expectedClusterSize, long electionTimeout, Clock clock, long catchupTimeout,
StateStorage<RaftMembershipState> stateStorage )
{
this.replicator = replicator;
Expand Down
Expand Up @@ -112,7 +112,7 @@ void onRaftGroupCommitted()
handleState( state.onRaftGroupCommitted() );
}

void onFollowerStateChange( FollowerStates followerStates )
void onFollowerStateChange( FollowerStates<CoreMember> followerStates )
{
handleState( state.onFollowerStateChange( followerStates ) );
}
Expand All @@ -127,7 +127,7 @@ void onSuperfluousMember( CoreMember member )
handleState( state.onSuperfluousMember( member ) );
}

void onTargetChanged( Set targetMembers )
void onTargetChanged( Set<CoreMember> targetMembers )
{
handleState( state.onTargetChanged( targetMembers ) );
}
Expand Down Expand Up @@ -185,7 +185,7 @@ public RaftMembershipStateMachineEventHandler onMissingMember( CoreMember member
@Override
public RaftMembershipStateMachineEventHandler onSuperfluousMember( CoreMember member )
{
Set updatedVotingMembers = new HashSet<>( membershipState.votingMembers() );
Set<CoreMember> updatedVotingMembers = new HashSet<>( membershipState.votingMembers() );
updatedVotingMembers.remove( member );
membershipDriver.doConsensus( updatedVotingMembers );

Expand Down Expand Up @@ -241,15 +241,15 @@ public RaftMembershipStateMachineEventHandler onRole( Role role )
}

@Override
public RaftMembershipStateMachineEventHandler onFollowerStateChange( FollowerStates followerStates )
public RaftMembershipStateMachineEventHandler onFollowerStateChange( FollowerStates<CoreMember> followerStates )
{
catchupGoalTracker.updateProgress( followerStates.get( catchingUpMember ) );

if ( catchupGoalTracker.isFinished() )
{
if ( catchupGoalTracker.isGoalAchieved() )
{
Set updatedVotingMembers = new HashSet<>( membershipState.votingMembers() );
Set<CoreMember> updatedVotingMembers = new HashSet<>( membershipState.votingMembers() );
updatedVotingMembers.add( catchingUpMember );
membershipDriver.doConsensus( updatedVotingMembers );

Expand Down
Expand Up @@ -31,13 +31,13 @@ interface RaftMembershipStateMachineEventHandler

RaftMembershipStateMachineEventHandler onRaftGroupCommitted();

RaftMembershipStateMachineEventHandler onFollowerStateChange( FollowerStates followerStates );
RaftMembershipStateMachineEventHandler onFollowerStateChange( FollowerStates<CoreMember> followerStates );

RaftMembershipStateMachineEventHandler onMissingMember( CoreMember member );

RaftMembershipStateMachineEventHandler onSuperfluousMember( CoreMember member );

RaftMembershipStateMachineEventHandler onTargetChanged( Set targetMembers );
RaftMembershipStateMachineEventHandler onTargetChanged( Set<CoreMember> targetMembers );

void onExit();

Expand Down Expand Up @@ -70,7 +70,7 @@ public RaftMembershipStateMachineEventHandler onSuperfluousMember( CoreMember me
}

@Override
public RaftMembershipStateMachineEventHandler onFollowerStateChange( FollowerStates followerStates )
public RaftMembershipStateMachineEventHandler onFollowerStateChange( FollowerStates<CoreMember> followerStates )
{
return this;
}
Expand Down
Expand Up @@ -34,7 +34,7 @@

public class Appending
{
static void handleAppendEntriesRequest( ReadableRaftState state, Outcome outcome,
static void handleAppendEntriesRequest( ReadableRaftState state, Outcome outcome,
RaftMessages.AppendEntries.Request request ) throws IOException
{
if ( request.leaderTerm() < state.term() )
Expand Down
Expand Up @@ -63,6 +63,12 @@ public synchronized void setVotingMembers( Set<CoreMember> newVotingMembers )
notifyListeners();
}

/**
* Adds an additional member to replicate to. Members that are joining need to
* catch up sufficiently before they become part of the voting group.
*
* @param member The member which will be added to the replication group.
*/
public synchronized void addAdditionalReplicationMember( CoreMember member )
{
additionalReplicationMembers.add( member );
Expand All @@ -71,6 +77,14 @@ public synchronized void addAdditionalReplicationMember( CoreMember member )
notifyListeners();
}

/**
* Removes a member previously part of the additional replication member group.
*
* This either happens because they caught up sufficiently and became part of the
* voting group or because they failed to catch up in time.
*
* @param member The member to remove from the replication group.
*/
public synchronized void removeAdditionalReplicationMember( CoreMember member )
{
additionalReplicationMembers.remove( member );
Expand Down
Expand Up @@ -94,7 +94,7 @@
import org.neo4j.coreedge.raft.state.LongIndexMarshal;
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState;
import org.neo4j.coreedge.raft.membership.RaftMembershipState;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.coreedge.raft.state.term.MonitoredTermStateStorage;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
Expand Down Expand Up @@ -208,9 +208,14 @@ public void registerProcedures( Procedures procedures )
final Supplier<DatabaseHealth> databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class );

CoreMember myself;
StateStorage<IdAllocationState> idAllocationState;
StateStorage<Long> lastFlushedStorage;
StateStorage<GlobalSessionTrackerState> sessionTrackerStorage;
StateStorage<ReplicatedLockTokenState> lockTokenState;

try
{
DurableStateStorage<CoreMember> idStorage = life.add( new DurableStateStorage<>(
StateStorage<CoreMember> idStorage = life.add( new DurableStateStorage<>(
fileSystem, clusterStateDirectory, "raft-member-id", new CoreMemberMarshal(), 1,
databaseHealthSupplier, logProvider ) );
CoreMember member = idStorage.getInitialState();
Expand All @@ -220,6 +225,29 @@ public void registerProcedures( Procedures procedures )
idStorage.persistStoreData( member );
}
myself = member;

lastFlushedStorage = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "last-flushed-state" ),
"last-flushed", new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ),
databaseHealthSupplier, logProvider ) );

sessionTrackerStorage = life.add( new DurableStateStorage<>( fileSystem,
new File( clusterStateDirectory, "session-tracker-state" ), "session-tracker",
new GlobalSessionTrackerState.Marshal( new CoreMemberMarshal() ),
config.get( CoreEdgeClusterSettings.global_session_tracker_state_size ), databaseHealthSupplier,
logProvider ) );

lockTokenState = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "lock-token-state" ),
"lock-token", new ReplicatedLockTokenState.Marshal( new CoreMemberMarshal() ),
config.get( CoreEdgeClusterSettings.replicated_lock_token_state_size ),
databaseHealthSupplier, logProvider ) );

idAllocationState = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "id-allocation-state" ),
"id-allocation", new IdAllocationState.Marshal(),
config.get( CoreEdgeClusterSettings.id_alloc_state_size ), databaseHealthSupplier,
logProvider ) );
}
catch ( IOException e )
{
Expand Down Expand Up @@ -289,91 +317,37 @@ public void registerProcedures( Procedures procedures )

RaftServer raftServer;
CoreState coreState;
try
{
DurableStateStorage<Long> lastFlushedStorage = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "last-flushed-state" ),
"last-flushed", new LongIndexMarshal(),
config.get( CoreEdgeClusterSettings.last_flushed_state_size ), databaseHealthSupplier,
logProvider ) );

StateStorage<GlobalSessionTrackerState> sessionTrackerStorage;
try
{
sessionTrackerStorage = life.add( new DurableStateStorage<>( fileSystem,
new File( clusterStateDirectory, "session-tracker-state" ), "session-tracker",
new GlobalSessionTrackerState.Marshal( new CoreMemberMarshal() ),
config.get( CoreEdgeClusterSettings.global_session_tracker_state_size ), databaseHealthSupplier,
logProvider ) );
}
catch ( IOException e )
{
throw new RuntimeException( e );
}

CoreStateApplier applier = new CoreStateApplier( logProvider );
CoreStateDownloader downloader =
new CoreStateDownloader( localDatabase, storeFetcher, coreToCoreClient, logProvider );
CoreStateApplier coreStateApplier = new CoreStateApplier( logProvider );
CoreStateDownloader downloader =
new CoreStateDownloader( localDatabase, storeFetcher, coreToCoreClient, logProvider );

InFlightMap<Long,RaftLogEntry> inFlightMap = new InFlightMap<>();
InFlightMap<Long,RaftLogEntry> inFlightMap = new InFlightMap<>();

NotMyselfSelectionStrategy someoneElse = new NotMyselfSelectionStrategy( discoveryService, myself );
NotMyselfSelectionStrategy someoneElse = new NotMyselfSelectionStrategy( discoveryService, myself );

coreState = dependencies.satisfyDependency( new CoreState(
raftLog, config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ),
config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ),
databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage,
sessionTrackerStorage, someoneElse, applier, downloader, inFlightMap, platformModule.monitors ) );
coreState = dependencies.satisfyDependency( new CoreState(
raftLog, config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ),
config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ),
databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage,
sessionTrackerStorage, someoneElse, coreStateApplier, downloader, inFlightMap, platformModule.monitors ) );

raftServer = new RaftServer( marshal, raftListenAddress, localDatabase, logProvider, coreState );
raftServer = new RaftServer( marshal, raftListenAddress, localDatabase, logProvider, coreState );

raft = dependencies.satisfyDependency( createRaft( life, loggingOutbound, discoveryService, config,
messageLogger, raftLog, coreState, fileSystem, clusterStateDirectory, myself, logProvider,
raftServer, raftTimeoutService, databaseHealthSupplier, inFlightMap, platformModule.monitors,
platformModule.jobScheduler, localDatabase ) );
raft = dependencies.satisfyDependency( createRaft( life, loggingOutbound, discoveryService, config,
messageLogger, raftLog, coreState, fileSystem, clusterStateDirectory, myself, logProvider,
raftServer, raftTimeoutService, databaseHealthSupplier, inFlightMap, platformModule.monitors,
platformModule.jobScheduler ) );

life.add( new PruningScheduler( coreState, platformModule.jobScheduler,
config.get( CoreEdgeClusterSettings.raft_log_pruning_frequency ) ) );
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
life.add( new PruningScheduler( coreState, platformModule.jobScheduler,
config.get( CoreEdgeClusterSettings.raft_log_pruning_frequency ) ) );

RaftReplicator replicator =
new RaftReplicator( raft, myself,
loggingOutbound,
sessionPool, progressTracker,
new ExponentialBackoffStrategy( 10, SECONDS ) );

StateStorage<ReplicatedLockTokenState> lockTokenState;
try
{
lockTokenState = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "lock-token-state" ),
"lock-token", new ReplicatedLockTokenState.Marshal( new CoreMemberMarshal() ),
config.get( CoreEdgeClusterSettings.replicated_lock_token_state_size ),
databaseHealthSupplier, logProvider ) );
}
catch ( IOException e )
{
throw new RuntimeException( e );
}

final StateStorage<IdAllocationState> idAllocationState;
try
{
idAllocationState = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "id-allocation-state" ),
"id-allocation", new IdAllocationState.Marshal(),
config.get( CoreEdgeClusterSettings.id_alloc_state_size ), databaseHealthSupplier,
logProvider ) );
}
catch ( IOException e )
{
throw new RuntimeException( e );
}

ReplicatedIdAllocationStateMachine idAllocationStateMachine =
new ReplicatedIdAllocationStateMachine( idAllocationState );

Expand Down Expand Up @@ -545,40 +519,28 @@ private static RaftInstance createRaft( LifeSupport life,
FileSystemAbstraction fileSystem, File clusterStateDirectory, CoreMember myself, LogProvider logProvider,
RaftServer raftServer, DelayedRenewableTimeoutService raftTimeoutService,
Supplier<DatabaseHealth> databaseHealthSupplier, InFlightMap<Long,RaftLogEntry> inFlightMap,
Monitors monitors, JobScheduler jobScheduler, LocalDatabase localDatabase )
Monitors monitors, JobScheduler jobScheduler )
{
StateStorage<TermState> termState;
StateStorage<VoteState> voteState;
StateStorage<RaftMembershipState> raftMembershipStorage;

try
{
StateStorage<TermState> durableTermState = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "term-state" ),
"term-state", new TermState.Marshal(),
config.get( CoreEdgeClusterSettings.term_state_size ), databaseHealthSupplier,
logProvider ) );

termState = new MonitoredTermStateStorage( durableTermState, monitors );
}
catch ( IOException e )
{
throw new RuntimeException( e );
}

StateStorage<VoteState> voteState;
try
{
voteState = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "vote-state" ),
"vote-state", new VoteState.Marshal( new CoreMemberMarshal() ),
config.get( CoreEdgeClusterSettings.vote_state_size ), databaseHealthSupplier,
logProvider ) );
}
catch ( IOException e )
{
throw new RuntimeException( e );
}

StateStorage<RaftMembershipState> raftMembershipStorage;
try
{
raftMembershipStorage = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "membership-state" ),
"membership-state", new RaftMembershipState.Marshal( new CoreMemberMarshal() ),
Expand Down Expand Up @@ -606,8 +568,7 @@ private static RaftInstance createRaft( LifeSupport life,
RaftMembershipManager raftMembershipManager =
new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider,
expectedClusterSize, electionTimeout, systemUTC(),
config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage
);
config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage );

RaftLogShippingManager logShipping =
new RaftLogShippingManager( raftOutbound, logProvider, raftLog, systemUTC(),
Expand Down Expand Up @@ -660,8 +621,7 @@ private static PrintWriter raftMessagesLog( File storeDir )

private SchemaWriteGuard createSchemaWriteGuard()
{
return () -> {
};
return () -> {};
}

private KernelData createKernelData( FileSystemAbstraction fileSystem, PageCache pageCache, File storeDir,
Expand Down
Expand Up @@ -33,7 +33,6 @@
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.IntFunction;

import static org.neo4j.helpers.collection.MapUtil.stringMap;
Expand Down

0 comments on commit 4d6b24d

Please sign in to comment.