Skip to content

Commit

Permalink
Apply commands to state machine in dedicated executor.
Browse files Browse the repository at this point in the history
This means that the main raft thread is no longer
respsonsible for applying commands, so its speed should
not be limited by expensive application.
  • Loading branch information
martinfurmanski authored and apcj committed Feb 24, 2016
1 parent f75c8d5 commit 9f2998e
Show file tree
Hide file tree
Showing 27 changed files with 638 additions and 380 deletions.
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft;

/**
* A consensus listener is notified when a particular index in the consensus log
* is considered committed.
*/
public interface ConsensusListener
{
/**
* Called when the highest committed index increases.
*/
void notifyCommitted();
}
Expand Up @@ -41,13 +41,10 @@
import org.neo4j.coreedge.raft.outcome.CommitCommand;
import org.neo4j.coreedge.raft.outcome.LogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine;
import org.neo4j.coreedge.raft.state.RaftState;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.raft.state.StateMachine;
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
Expand Down Expand Up @@ -85,7 +82,6 @@
public class RaftInstance<MEMBER> implements LeaderLocator<MEMBER>, Inbound.MessageHandler, CoreMetaData
{
private final LeaderNotFoundMonitor leaderNotFoundMonitor;
private int flushAfter;

public enum Timeouts implements RenewableTimeoutService.TimeoutName
{
Expand All @@ -101,7 +97,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private RenewableTimeoutService.RenewableTimeout electionTimer;
private RaftMembershipManager<MEMBER> membershipManager;

private final LastAppliedTrackingStateMachine stateMachine;
private final ConsensusListener consensusListener;
private final long electionTimeout;
private final long leaderWaitTimeout;

Expand All @@ -116,18 +112,18 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private RaftLogShippingManager<MEMBER> logShipping;

public RaftInstance( MEMBER myself, StateStorage<TermState> termStorage,
StateStorage<VoteState<MEMBER>> voteStorage, RaftLog entryLog,
LastAppliedTrackingStateMachine stateMachine, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
final Inbound inbound, final Outbound<MEMBER> outbound, long leaderWaitTimeout,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier,
Monitors monitors, int flushAfter )
StateStorage<VoteState<MEMBER>> voteStorage, RaftLog entryLog,
ConsensusListener consensusListener, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
final Inbound inbound, final Outbound<MEMBER> outbound, long leaderWaitTimeout,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier,
Monitors monitors )
{
this.myself = myself;
this.entryLog = entryLog;
this.stateMachine = stateMachine;
this.consensusListener = consensusListener;
this.electionTimeout = electionTimeout;
this.heartbeatInterval = heartbeatInterval;

Expand All @@ -137,7 +133,6 @@ public RaftInstance( MEMBER myself, StateStorage<TermState> termStorage,
this.outbound = outbound;
this.logShipping = logShipping;
this.databaseHealthSupplier = databaseHealthSupplier;
this.flushAfter = flushAfter;
this.log = logProvider.getLog( getClass() );

this.membershipManager = membershipManager;
Expand Down Expand Up @@ -262,16 +257,8 @@ private void handleOutcome( Outcome<MEMBER> outcome ) throws RaftStorageExceptio

raftState.update( outcome );
membershipManager.processLog( outcome.getLogCommands() );
consensusListener.notifyCommitted();

for ( long index = stateMachine.lastApplied() + 1; index <= raftState.entryLog().commitIndex(); index++ )
{
ReplicatedContent content = raftState.entryLog().readEntryContent( index );
stateMachine.applyCommand( content, index );
if ( index % this.flushAfter == 0 )
{
stateMachine.flush();
}
}
volatileLeader.set( outcome.getLeader() );
}

Expand Down Expand Up @@ -420,4 +407,4 @@ public Set<MEMBER> replicationMembers()
{
return membershipManager.replicationMembers();
}
}
}
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.state;

import java.io.IOException;

import org.neo4j.storageengine.api.ReadPastEndException;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;

public class LastAppliedState
{
private final long lastApplied;

public LastAppliedState( long lastApplied )
{
this.lastApplied = lastApplied;
}

public long get()
{
return lastApplied;
}

public static class Marshal implements StateMarshal<LastAppliedState>
{
@Override
public LastAppliedState startState()
{
return new LastAppliedState( -1 );
}

@Override
public long ordinal( LastAppliedState lastAppliedState )
{
return lastAppliedState.get();
}

@Override
public void marshal( LastAppliedState lastAppliedState, WritableChannel channel ) throws IOException
{
channel.putLong( lastAppliedState.get() );
}

@Override
public LastAppliedState unmarshal( ReadableChannel source ) throws IOException
{
try
{
return new LastAppliedState( source.getLong() );
}
catch( ReadPastEndException e )
{
return null;
}
}
}
}
@@ -0,0 +1,124 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.state;

import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.neo4j.coreedge.raft.ConsensusListener;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.System.currentTimeMillis;

public class StateMachineApplier extends LifecycleAdapter implements ConsensusListener
{
public static final long NOTHING_APPLIED = -1;

private final StateMachine stateMachine;
private final ReadableRaftLog raftLog;
private final StateStorage<LastAppliedState> lastAppliedStorage;
private final int flushEvery;
private final Supplier<DatabaseHealth> dbHealth;
private final Log log;
private long lastApplied = NOTHING_APPLIED;

private Executor executor;

private long commitIndex = NOTHING_APPLIED;

public StateMachineApplier(
StateMachine stateMachine,
ReadableRaftLog raftLog,
StateStorage<LastAppliedState> lastAppliedStorage,
Executor executor,
int flushEvery,
Supplier<DatabaseHealth> dbHealth,
LogProvider logProvider )
{
this.stateMachine = stateMachine;
this.raftLog = raftLog;
this.lastAppliedStorage = lastAppliedStorage;
this.flushEvery = flushEvery;
this.log = logProvider.getLog( getClass() );
this.dbHealth = dbHealth;
this.executor = executor;
}

@Override
public synchronized void notifyCommitted()
{
long commitIndex = raftLog.commitIndex();
if ( this.commitIndex != commitIndex )
{
this.commitIndex = commitIndex;
executor.execute( () -> {

try
{
applyUpTo( commitIndex );
}
catch ( Exception e )
{
log.error( "Failed to apply up to index " + commitIndex, e );
dbHealth.get().panic( e );
}
} );
}
}

private void applyUpTo( long commitIndex ) throws IOException, RaftStorageException
{

while ( lastApplied < commitIndex )
{
long indexToApply = lastApplied + 1;

RaftLogEntry logEntry = raftLog.readLogEntry( indexToApply );
stateMachine.applyCommand( logEntry.content(), indexToApply );

lastApplied = indexToApply;

if ( indexToApply % this.flushEvery == 0 )
{
stateMachine.flush();
lastAppliedStorage.persistStoreData( new LastAppliedState( lastApplied ) );
}
}
}

@Override
public synchronized void start() throws IOException, RaftStorageException
{
lastApplied = lastAppliedStorage.getInitialState().get();
log.info( "Replaying commands from index %d to index %d", lastApplied, raftLog.commitIndex() );

long start = currentTimeMillis();
applyUpTo( raftLog.commitIndex() );
log.info( "Replay done, took %d ms", currentTimeMillis() - start );
}
}
Expand Up @@ -142,12 +142,16 @@ public String toString()
setting( "core_edge.disable_middleware_logging", BOOLEAN, TRUE );

@Description("The maximum file size before the id allocation file is rotated (in unit of entries)")
public static final Setting<Integer> id_alloc_state_size = setting( "core_edge.id_alloc_state_size", INTEGER,
"1000" );
public static final Setting<Integer> last_applied_state_size =
setting( "core_edge.last_applied_state_size", INTEGER, "1000" );

@Description("The maximum file size before the id allocation file is rotated (in unit of entries)")
public static final Setting<Integer> id_alloc_state_size =
setting( "core_edge.id_alloc_state_size", INTEGER, "1000" );

@Description("The maximum file size before the membership state file is rotated (in unit of entries)")
public static final Setting<Integer> raft_membership_state_size = setting( "core_edge.raft_membership_state_size",
INTEGER, "1000" );
public static final Setting<Integer> raft_membership_state_size =
setting( "core_edge.raft_membership_state_size", INTEGER, "1000" );

@Description("The maximum file size before the vote state file is rotated (in unit of entries)")
public static final Setting<Integer> vote_state_size = setting( "core_edge.raft_vote_state_size", INTEGER, "1000" );
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory;
import org.neo4j.coreedge.raft.state.StateMachineApplier;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand All @@ -41,22 +42,25 @@
public class CoreServerStartupProcess
{

public static LifeSupport createLifeSupport( DataSourceManager dataSourceManager,
ReplicatedIdGeneratorFactory idGeneratorFactory,
RaftInstance<CoreMember> raft, RaftLogReplay raftLogReplay, RaftServer<CoreMember> raftServer,
CatchupServer catchupServer,
DelayedRenewableTimeoutService raftTimeoutService,
MembershipWaiter<CoreMember> membershipWaiter,
long joinCatchupTimeout,
RecoverTransactionLogState recoverTransactionLogState,
Lifecycle tokenLife )
public static LifeSupport createLifeSupport(
DataSourceManager dataSourceManager,
ReplicatedIdGeneratorFactory idGeneratorFactory,
RaftInstance<CoreMember> raft,
StateMachineApplier recoverableStateMachine,
RaftServer<CoreMember> raftServer,
CatchupServer catchupServer,
DelayedRenewableTimeoutService raftTimeoutService,
MembershipWaiter<CoreMember> membershipWaiter,
long joinCatchupTimeout,
RecoverTransactionLogState recoverTransactionLogState,
Lifecycle tokenLife )
{
LifeSupport services = new LifeSupport();
services.add( dataSourceManager );
services.add( idGeneratorFactory );
services.add( recoverTransactionLogState );
services.add( tokenLife );
services.add( raftLogReplay );
services.add( recoverableStateMachine );
services.add( raftServer );
services.add( catchupServer );
services.add( raftTimeoutService );
Expand Down Expand Up @@ -98,5 +102,4 @@ public void start() throws Throwable
}
}
}

}

0 comments on commit 9f2998e

Please sign in to comment.