Skip to content

Commit

Permalink
core-edge: various cleanup, mostly removing leftovers and fixing gene…
Browse files Browse the repository at this point in the history
…rics
  • Loading branch information
martinfurmanski committed Jun 28, 2016
1 parent c6be898 commit 65ce44b
Show file tree
Hide file tree
Showing 40 changed files with 216 additions and 293 deletions.
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft;

import org.neo4j.coreedge.server.CoreMember;

class EmptyStateMachine implements RaftStateMachine
{
@Override
public void notifyCommitted( long commitIndex )
{
}

@Override
public void notifyNeedFreshSnapshot()
{
}

@Override
public void downloadSnapshot( CoreMember from )
{
}
}
Expand Up @@ -26,7 +26,7 @@ public interface LeaderLocator
{
CoreMember getLeader() throws NoLeaderFoundException;

void registerListener( Listener listener );
void registerListener( Listener<CoreMember> listener );

void unregisterListener( Listener listener );
void unregisterListener( Listener<CoreMember> listener );
}
Expand Up @@ -27,8 +27,6 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.helper.VolatileFuture;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
Expand All @@ -47,9 +45,6 @@
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.LeaderOnlySelectionStrategy;
import org.neo4j.coreedge.server.core.NotMyselfSelectionStrategy;
import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
Expand Down Expand Up @@ -101,27 +96,24 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private final long electionTimeout;

private final Supplier<DatabaseHealth> databaseHealthSupplier;
private final LocalDatabase localDatabase;
private final VolatileFuture<CoreMember> volatileLeader = new VolatileFuture<>( null );

private final CoreServerSelectionStrategy defaultStrategy;
private final Outbound<CoreMember, RaftMessages.RaftMessage> outbound;
private final Log log;
private Role currentRole = Role.FOLLOWER;

private RaftLogShippingManager logShipping;

public RaftInstance( CoreMember myself, StateStorage<TermState> termStorage,
StateStorage<VoteState> voteStorage, RaftLog entryLog,
RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
CoreTopologyService discoveryService,
Outbound<CoreMember, RaftMessages.RaftMessage> outbound,
LogProvider logProvider, RaftMembershipManager membershipManager,
RaftLogShippingManager logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier,
InFlightMap<Long, RaftLogEntry> inFlightMap,
Monitors monitors, LocalDatabase localDatabase )
StateStorage<VoteState> voteStorage, RaftLog entryLog,
RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
Outbound<CoreMember,RaftMessages.RaftMessage> outbound,
LogProvider logProvider, RaftMembershipManager membershipManager,
RaftLogShippingManager logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier,
InFlightMap<Long,RaftLogEntry> inFlightMap,
Monitors monitors )
{
this.myself = myself;
this.entryLog = entryLog;
Expand All @@ -130,12 +122,10 @@ public RaftInstance( CoreMember myself, StateStorage<TermState> termStorage,
this.heartbeatInterval = heartbeatInterval;

this.renewableTimeoutService = renewableTimeoutService;
this.defaultStrategy = new NotMyselfSelectionStrategy( discoveryService, myself );

this.outbound = outbound;
this.logShipping = logShipping;
this.databaseHealthSupplier = databaseHealthSupplier;
this.localDatabase = localDatabase;
this.log = logProvider.getLog( getClass() );

this.membershipManager = membershipManager;
Expand Down Expand Up @@ -212,7 +202,7 @@ public CoreMember getLeader() throws NoLeaderFoundException
return waitForLeader( 0, member -> member != null );
}

private CoreMember waitForLeader( long timeoutMillis, Predicate predicate ) throws NoLeaderFoundException
private CoreMember waitForLeader( long timeoutMillis, Predicate<CoreMember> predicate ) throws NoLeaderFoundException
{
try
{
Expand All @@ -232,10 +222,10 @@ private CoreMember waitForLeader( long timeoutMillis, Predicate predicate ) thro
}
}

private Collection<Listener> leaderListeners = new ArrayList<>();
private Collection<Listener<CoreMember>> leaderListeners = new ArrayList<>();

@Override
public synchronized void registerListener( Listener listener )
public synchronized void registerListener( Listener<CoreMember> listener )
{
leaderListeners.add( listener );
listener.receive( state.leader() );
Expand All @@ -256,16 +246,13 @@ private void checkForSnapshotNeed( Outcome outcome )
{
if ( outcome.needsFreshSnapshot() )
{
CoreServerSelectionStrategy strategy = outcome.isProcessable()
? defaultStrategy
: new LeaderOnlySelectionStrategy( outcome );
raftStateMachine.notifyNeedFreshSnapshot( strategy );
raftStateMachine.notifyNeedFreshSnapshot();
}
}

private void notifyLeaderChanges( Outcome outcome )
{
for ( Listener listener : leaderListeners )
for ( Listener<CoreMember> listener : leaderListeners )
{
listener.receive( outcome.getLeader() );
}
Expand Down Expand Up @@ -314,7 +301,7 @@ public synchronized void handle( RaftMessages.RaftMessage incomingMessage )
{
try
{
Outcome outcome = currentRole.handler.handle( incomingMessage, state, log, localDatabase );
Outcome outcome = currentRole.handler.handle( incomingMessage, state, log );

boolean newLeaderWasElected = leaderChanged( outcome, state.leader() );
boolean newCommittedEntry = outcome.getCommitIndex() > state.commitIndex();
Expand Down Expand Up @@ -410,7 +397,7 @@ public String toString()

public static class BootstrapException extends Exception
{
public BootstrapException( Throwable cause )
BootstrapException( Throwable cause )
{
super( cause );
}
Expand All @@ -426,12 +413,12 @@ private long randomTimeoutRange()
return electionTimeout;
}

public Set votingMembers()
public Set<CoreMember> votingMembers()
{
return membershipManager.votingMembers();
}

public Set replicationMembers()
public Set<CoreMember> replicationMembers()
{
return membershipManager.replicationMembers();
}
Expand Down
Expand Up @@ -21,15 +21,11 @@

import java.io.IOException;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.RaftState;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.logging.Log;

public interface RaftMessageHandler
{
Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState context, Log log, LocalDatabase localDatabase )
throws IOException;
Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState context, Log log ) throws IOException;
}
Expand Up @@ -148,7 +148,7 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext,
{
if ( localDatabase.isEmpty() )
{
raftStateMachine.notifyNeedFreshSnapshot( message::from );
raftStateMachine.downloadSnapshot( message.from() );
}
else
{
Expand Down
Expand Up @@ -19,7 +19,7 @@
*/
package org.neo4j.coreedge.raft;

import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy;
import org.neo4j.coreedge.server.CoreMember;

/**
* The RAFT external entity that is interested in log entries and
Expand All @@ -30,14 +30,15 @@ public interface RaftStateMachine
/**
* Called when the highest committed index increases.
*/
default void notifyCommitted( long commitIndex ) {}
void notifyCommitted( long commitIndex );

/**
* Download and install a snapshot of state from another member of the cluster.
* <p/>
* Called when the consensus system no longer has the log entries required to
* further update the state machine, because they have been deleted through pruning.
* @param strategy the strategy on how to pick a core to download from
*/
default void notifyNeedFreshSnapshot( CoreServerSelectionStrategy strategy ) {}
void notifyNeedFreshSnapshot();

void downloadSnapshot( CoreMember from );
}
Expand Up @@ -26,7 +26,6 @@
import java.util.HashSet;
import java.util.Set;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogCursor;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
Expand Down Expand Up @@ -68,22 +67,19 @@ public class RaftMembershipManager implements RaftMembership, MembershipDriver
private final Log log;
private final int expectedClusterSize;
private final StateStorage<RaftMembershipState> stateStorage;
private final LocalDatabase localDatabase;
private final RaftMembershipState raftMembershipState;
private long lastApplied = -1;

public RaftMembershipManager( SendToMyself replicator, RaftGroup.Builder memberSetBuilder, RaftLog entryLog,
LogProvider logProvider, int expectedClusterSize, long electionTimeout,
Clock clock, long catchupTimeout,
StateStorage<RaftMembershipState> stateStorage,
LocalDatabase localDatabase)
LogProvider logProvider, int expectedClusterSize, long electionTimeout,
Clock clock, long catchupTimeout,
StateStorage<RaftMembershipState> stateStorage )
{
this.replicator = replicator;
this.memberSetBuilder = memberSetBuilder;
this.entryLog = entryLog;
this.expectedClusterSize = expectedClusterSize;
this.stateStorage = stateStorage;
this.localDatabase = localDatabase;
this.raftMembershipState = stateStorage.getInitialState();
this.log = logProvider.getLog( getClass() );

Expand All @@ -97,7 +93,7 @@ public void processLog( long commitIndex, Collection<LogCommand> logCommands ) t
{
if ( logCommand instanceof TruncateLogCommand )
{
onTruncated(commitIndex);
onTruncated( commitIndex );
}
if ( logCommand instanceof AppendLogEntry )
{
Expand Down Expand Up @@ -201,9 +197,9 @@ private Pair<Long,RaftGroup<CoreMember>> findLastMembershipEntry() throws IOExce
{
Pair<Long,RaftGroup<CoreMember>> lastMembershipEntry = null;
long index = 0;
try( RaftLogCursor cursor = entryLog.getEntryCursor( index ) )
try ( RaftLogCursor cursor = entryLog.getEntryCursor( index ) )
{
while( cursor.next() )
while ( cursor.next() )
{
ReplicatedContent content = cursor.get().content();
if ( content instanceof RaftGroup )
Expand Down
Expand Up @@ -54,8 +54,6 @@ public class Outcome implements Message

private long commitIndex;

private boolean processable;

/* Follower */
private CoreMember votedFor;
private boolean renewElectionTimeout;
Expand All @@ -66,7 +64,7 @@ public class Outcome implements Message
private long lastLogIndexBeforeWeBecameLeader;

/* Leader */
private FollowerStates followerStates;
private FollowerStates<CoreMember> followerStates;
private Collection<ShipCommand> shipCommands = new ArrayList<>();
private boolean electedLeader;
private boolean steppingDown;
Expand All @@ -78,7 +76,7 @@ public Outcome( Role currentRole, ReadableRaftState ctx )

public Outcome( Role nextRole, long term, CoreMember leader, long leaderCommit, CoreMember votedFor,
Set<CoreMember> votesForMe, long lastLogIndexBeforeWeBecameLeader,
FollowerStates followerStates, boolean renewElectionTimeout,
FollowerStates<CoreMember> followerStates, boolean renewElectionTimeout,
Collection<LogCommand> logCommands, Collection<RaftMessages.Directed> outgoingMessages,
Collection<ShipCommand> shipCommands, long commitIndex )
{
Expand Down Expand Up @@ -110,7 +108,6 @@ private void defaults( Role currentRole, ReadableRaftState ctx )
votedFor = ctx.votedFor();
renewElectionTimeout = false;
needsFreshSnapshot = false;
processable = true;

votesForMe = (currentRole == Role.CANDIDATE) ? new HashSet<>( ctx.votesForMe() ) : new HashSet<>();

Expand Down Expand Up @@ -165,16 +162,6 @@ public void markNeedForFreshSnapshot()
this.needsFreshSnapshot = true;
}

public void markUnprocessable()
{
this.processable = false;
}

public boolean isProcessable()
{
return processable;
}

public void addVoteForMe( CoreMember voteFrom )
{
this.votesForMe.add( voteFrom );
Expand All @@ -185,7 +172,7 @@ public void setLastLogIndexBeforeWeBecameLeader( long lastLogIndexBeforeWeBecame
this.lastLogIndexBeforeWeBecameLeader = lastLogIndexBeforeWeBecameLeader;
}

public void replaceFollowerStates( FollowerStates followerStates )
public void replaceFollowerStates( FollowerStates<CoreMember> followerStates )
{
this.followerStates = followerStates;
}
Expand Down Expand Up @@ -274,7 +261,7 @@ public boolean needsFreshSnapshot()
return needsFreshSnapshot;
}

public Set getVotesForMe()
public Set<CoreMember> getVotesForMe()
{
return votesForMe;
}
Expand All @@ -284,7 +271,7 @@ public long getLastLogIndexBeforeWeBecameLeader()
return lastLogIndexBeforeWeBecameLeader;
}

public FollowerStates getFollowerStates()
public FollowerStates<CoreMember> getFollowerStates()
{
return followerStates;
}
Expand Down

0 comments on commit 65ce44b

Please sign in to comment.