Skip to content

Commit

Permalink
Introduce a better replicator abstraction and state machine model.
Browse files Browse the repository at this point in the history
This commit introduces a replicator abstraction for RAFT that internally
manages tracking of operations and results together with a progress tracker.
This then allows the state machines and service implementations to be greatly
streamlined and simplified.

All replicated content that flows through the RaftReplicator now gets associated
with a unique session/operationId pair as captured by the DistributedOperation class.
It also carries the command that will be dispatched to the associated state machine.

All state machines now implement a simple StateMachine interface that is aligned
with Lamport's consensus state machine model. They handle commands, carry their
state and produce results.

The individual state machines are collected in the CoreStateMachines class which
is handled by CoreState. This class now owns the session tracking which lives
one step above, on the replication level, and not as before at the state machine level.
  • Loading branch information
martinfurmanski committed Mar 31, 2016
1 parent 367f026 commit f5ed09b
Show file tree
Hide file tree
Showing 74 changed files with 2,105 additions and 2,578 deletions.
Expand Up @@ -45,7 +45,7 @@
import org.neo4j.coreedge.catchup.tx.core.TxPullRequestHandler;
import org.neo4j.coreedge.catchup.tx.core.TxPullResponseEncoder;
import org.neo4j.coreedge.catchup.tx.edge.TxStreamFinishedResponseEncoder;
import org.neo4j.coreedge.raft.state.StateMachine;
import org.neo4j.coreedge.raft.state.CoreState;
import org.neo4j.coreedge.server.ListenSocketAddress;
import org.neo4j.coreedge.server.logging.ExceptionLoggingHandler;
import org.neo4j.helpers.NamedThreadFactory;
Expand All @@ -70,7 +70,7 @@ public class CatchupServer extends LifecycleAdapter
private final Supplier<NeoStoreDataSource> dataSourceSupplier;

private final NamedThreadFactory threadFactory = new NamedThreadFactory( "catchup-server" );
private final Supplier<StateMachine> stateMachine;
private final CoreState coreState;
private final ListenSocketAddress listenAddress;

private EventLoopGroup workerGroup;
Expand All @@ -84,10 +84,10 @@ public CatchupServer( LogProvider logProvider,
Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier,
Supplier<CheckPointer> checkPointerSupplier,
Supplier<StateMachine> stateMachine,
CoreState coreState,
ListenSocketAddress listenAddress, Monitors monitors )
{
this.stateMachine = stateMachine;
this.coreState = coreState;
this.listenAddress = listenAddress;
this.transactionIdStoreSupplier = transactionIdStoreSupplier;
this.storeIdSupplier = storeIdSupplier;
Expand Down Expand Up @@ -145,7 +145,7 @@ protected void initChannel( SocketChannel ch ) throws Exception
checkPointerSupplier ) );

pipeline.addLast( new GetRaftStateRequestDecoder( protocol ) );
pipeline.addLast( new GetRaftStateRequestHandler( protocol, stateMachine ) );
pipeline.addLast( new GetRaftStateRequestHandler( protocol, coreState ) );

pipeline.addLast( new ExceptionLoggingHandler( log ) );
}
Expand Down
Expand Up @@ -21,33 +21,32 @@

import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import org.neo4j.coreedge.catchup.CatchupServerProtocol;
import org.neo4j.coreedge.catchup.ResponseMessageType;
import org.neo4j.coreedge.catchup.storecopy.edge.GetRaftStateRequest;
import org.neo4j.coreedge.raft.state.StateMachine;
import org.neo4j.coreedge.raft.state.CoreState;

import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage;

public class GetRaftStateRequestHandler extends SimpleChannelInboundHandler<GetRaftStateRequest>
{
private final CatchupServerProtocol protocol;
private final Supplier<StateMachine> stateMachine;
private final CoreState coreState;

public GetRaftStateRequestHandler( CatchupServerProtocol protocol, Supplier<StateMachine> stateMachine )
public GetRaftStateRequestHandler( CatchupServerProtocol protocol, CoreState coreState )
{
this.protocol = protocol;
this.stateMachine = stateMachine;
this.coreState = coreState;
}

@Override
protected void channelRead0( ChannelHandlerContext ctx, GetRaftStateRequest msg ) throws Exception
{
sendStates( ctx, stateMachine.get().snapshot() );
sendStates( ctx, coreState.snapshot() );
protocol.expect( NextMessage.MESSAGE_TYPE );
}

Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.neo4j.coreedge.catchup.storecopy.core.RaftStateType;
import org.neo4j.coreedge.catchup.storecopy.edge.CoreClient;
import org.neo4j.coreedge.catchup.tx.edge.RaftStateSnapshotListener;
import org.neo4j.coreedge.raft.state.StateMachine;
import org.neo4j.coreedge.raft.state.CoreState;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;

public class StateFetcher
Expand All @@ -41,7 +41,7 @@ public StateFetcher( CoreClient coreClient )
this.coreClient = coreClient;
}

public void copyRaftState( AdvertisedSocketAddress from, StateMachine stateMachine ) throws StoreCopyFailedException
public void copyRaftState( AdvertisedSocketAddress from, CoreState coreState ) throws StoreCopyFailedException
{
HashMap<RaftStateType, Object> map = new HashMap<>();

Expand All @@ -61,7 +61,7 @@ public void copyRaftState( AdvertisedSocketAddress from, StateMachine stateMachi
try
{
HashMap<RaftStateType, Object> snapshot = completableSnapshot.get( 10, TimeUnit.SECONDS );
stateMachine.installSnapshot( snapshot );
coreState.installSnapshots( snapshot );

coreClient.removeRaftStateSnapshotListener( listener );
}
Expand Down
Expand Up @@ -33,8 +33,8 @@
import org.neo4j.coreedge.raft.outcome.BatchAppendLogEntries;
import org.neo4j.coreedge.raft.outcome.LogCommand;
import org.neo4j.coreedge.raft.outcome.TruncateLogCommand;
import org.neo4j.coreedge.raft.replication.LeaderOnlyReplicator;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.Replicator;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
Expand Down Expand Up @@ -62,7 +62,7 @@ public class RaftMembershipManager<MEMBER> implements RaftMembership<MEMBER>, Me

private int uncommittedMemberChanges = 0;

private final Replicator replicator;
private final LeaderOnlyReplicator replicator;
private final RaftGroup.Builder<MEMBER> memberSetBuilder;
private final ReadableRaftLog entryLog;
private final Log log;
Expand All @@ -71,7 +71,7 @@ public class RaftMembershipManager<MEMBER> implements RaftMembership<MEMBER>, Me
private final RaftMembershipState<MEMBER> raftMembershipState;
private long lastApplied = -1;

public RaftMembershipManager( Replicator replicator, RaftGroup.Builder<MEMBER> memberSetBuilder, RaftLog entryLog,
public RaftMembershipManager( LeaderOnlyReplicator replicator, RaftGroup.Builder<MEMBER> memberSetBuilder, RaftLog entryLog,
LogProvider logProvider, int expectedClusterSize, long electionTimeout,
Clock clock, long catchupTimeout,
StateStorage<RaftMembershipState<MEMBER>> stateStorage )
Expand Down Expand Up @@ -183,7 +183,7 @@ private void onTruncated( long commitIndex ) throws IOException, RaftLogCompacte
if ( lastMembershipEntry != null )
{
raftMembershipState.setVotingMembers( lastMembershipEntry.other().getMembers() );
raftMembershipState.logIndex( lastMembershipEntry.first().longValue() );
raftMembershipState.logIndex( lastMembershipEntry.first() );
stateStorage.persistStoreData( raftMembershipState );
uncommittedMemberChanges = lastMembershipEntry.first() <= commitIndex ? 0 : 1;
}
Expand Down Expand Up @@ -267,14 +267,7 @@ else if ( isSafeToRemoveMember() && superfluousMembers().size() > 0 )
@Override
public void doConsensus( Set<MEMBER> newVotingMemberSet )
{
try
{
replicator.replicate( memberSetBuilder.build( newVotingMemberSet ) );
}
catch ( Replicator.ReplicationFailedException e )
{
// TODO: log
}
replicator.replicate( memberSetBuilder.build( newVotingMemberSet ) );
}

@Override
Expand Down
Expand Up @@ -19,12 +19,14 @@
*/
package org.neo4j.coreedge.raft.net;

import io.netty.buffer.ByteBuf;

import java.io.IOException;

import io.netty.buffer.ByteBuf;
import org.neo4j.coreedge.raft.NewLeaderBarrier;
import org.neo4j.coreedge.raft.membership.CoreMemberSet;
import org.neo4j.coreedge.raft.membership.CoreMemberSetSerializer;
import org.neo4j.coreedge.raft.replication.DistributedOperation;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationRequest;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationRequestSerializer;
Expand All @@ -48,6 +50,7 @@ public class CoreReplicatedContentMarshal implements ChannelMarshal<ReplicatedCo
private static final byte TOKEN_REQUEST_TYPE = 4;
private static final byte NEW_LEADER_BARRIER_TYPE = 5;
private static final byte LOCK_TOKEN_REQUEST = 6;
private static final byte DISTRIBUTED_OPERATION = 7;

@Override
public void marshal( ReplicatedContent content, WritableChannel channel ) throws IOException
Expand Down Expand Up @@ -81,6 +84,11 @@ else if( content instanceof ReplicatedLockTokenRequest )
channel.put( LOCK_TOKEN_REQUEST );
ReplicatedLockTokenSerializer.marshal( (ReplicatedLockTokenRequest<CoreMember>) content, channel );
}
else if( content instanceof DistributedOperation )
{
channel.put( DISTRIBUTED_OPERATION );
((DistributedOperation) content).serialize( channel );
}
else
{
throw new IllegalArgumentException( "Unknown content type " + content.getClass() );
Expand Down Expand Up @@ -114,6 +122,9 @@ public ReplicatedContent unmarshal( ReadableChannel channel ) throws IOException
case LOCK_TOKEN_REQUEST:
content = ReplicatedLockTokenSerializer.unmarshal( channel );
break;
case DISTRIBUTED_OPERATION:
content = DistributedOperation.deserialize( channel );
break;
default:
throw new IllegalArgumentException( String.format( "Unknown content type 0x%x", type ) );
}
Expand Down Expand Up @@ -157,6 +168,11 @@ else if( content instanceof ReplicatedLockTokenRequest )
buffer.writeByte( LOCK_TOKEN_REQUEST );
ReplicatedLockTokenSerializer.marshal( (ReplicatedLockTokenRequest<CoreMember>) content, buffer );
}
else if( content instanceof DistributedOperation )
{
buffer.writeByte( DISTRIBUTED_OPERATION );
((DistributedOperation)content).serialize( buffer );
}
else
{
throw new IllegalArgumentException( "Unknown content type " + content.getClass() );
Expand Down Expand Up @@ -190,6 +206,9 @@ public ReplicatedContent unmarshal( ByteBuf buffer )
case LOCK_TOKEN_REQUEST:
content = ReplicatedLockTokenSerializer.unmarshal( buffer );
break;
case DISTRIBUTED_OPERATION:
content = DistributedOperation.deserialize( buffer );
break;
default:
throw new IllegalArgumentException( String.format( "Unknown content type 0x%x", type ) );
}
Expand Down
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.replication;

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.UUID;

import org.neo4j.coreedge.raft.net.CoreReplicatedContentMarshal;
import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.LocalOperationId;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;

/**
* A uniquely identifiable operation.
*/
public class DistributedOperation implements ReplicatedContent
{
private final ReplicatedContent content;
private final GlobalSession<CoreMember> globalSession;
private final LocalOperationId operationId;

public DistributedOperation( ReplicatedContent content, GlobalSession<CoreMember> globalSession, LocalOperationId operationId )
{
this.content = content;
this.globalSession = globalSession;
this.operationId = operationId;
}

public GlobalSession<CoreMember> globalSession()
{
return globalSession;
}

public LocalOperationId operationId()
{
return operationId;
}

public ReplicatedContent content()
{
return content;
}

public void serialize( WritableChannel channel ) throws IOException
{
channel.putLong( globalSession().sessionId().getMostSignificantBits() );
channel.putLong( globalSession().sessionId().getLeastSignificantBits() );
new CoreMember.CoreMemberMarshal().marshal( globalSession().owner(), channel );

channel.putLong( operationId.localSessionId() );
channel.putLong( operationId.sequenceNumber() );

new CoreReplicatedContentMarshal().marshal( content, channel );
}

public static DistributedOperation deserialize( ReadableChannel channel ) throws IOException
{
long mostSigBits = channel.getLong();
long leastSigBits = channel.getLong();
CoreMember owner = new CoreMember.CoreMemberMarshal().unmarshal( channel );
GlobalSession<CoreMember> globalSession = new GlobalSession<>( new UUID( mostSigBits, leastSigBits ), owner );

long localSessionId = channel.getLong();
long sequenceNumber = channel.getLong();
LocalOperationId localOperationId = new LocalOperationId( localSessionId, sequenceNumber );

ReplicatedContent content = new CoreReplicatedContentMarshal().unmarshal( channel );
return new DistributedOperation( content, globalSession, localOperationId );
}

public void serialize( ByteBuf buffer )
{
buffer.writeLong( globalSession().sessionId().getMostSignificantBits() );
buffer.writeLong( globalSession().sessionId().getLeastSignificantBits() );
new CoreMember.CoreMemberMarshal().marshal( (CoreMember) globalSession().owner(), buffer );

buffer.writeLong( operationId.localSessionId() );
buffer.writeLong( operationId.sequenceNumber() );

new CoreReplicatedContentMarshal().marshal( content, buffer );
}

public static ReplicatedContent deserialize( ByteBuf buffer )
{
long mostSigBits = buffer.readLong();
long leastSigBits =buffer.readLong();
CoreMember owner = new CoreMember.CoreMemberMarshal().unmarshal( buffer );
GlobalSession<CoreMember> globalSession = new GlobalSession<>( new UUID( mostSigBits, leastSigBits ), owner );

long localSessionId = buffer.readLong();
long sequenceNumber = buffer.readLong();
LocalOperationId localOperationId = new LocalOperationId( localSessionId, sequenceNumber );

ReplicatedContent content = new CoreReplicatedContentMarshal().unmarshal( buffer );
return new DistributedOperation( content, globalSession, localOperationId );
}

@Override
public String toString()
{
return "DistributedOperation{" +
"content=" + content +
", globalSession=" + globalSession +
", operationId=" + operationId +
'}';
}
}
Expand Up @@ -23,7 +23,7 @@
import org.neo4j.coreedge.raft.net.Outbound;


public class LeaderOnlyReplicator<MEMBER,SOCKET> implements Replicator
public class LeaderOnlyReplicator<MEMBER,SOCKET>
{
private final MEMBER source;
private final SOCKET target;
Expand All @@ -36,8 +36,7 @@ public LeaderOnlyReplicator( MEMBER source, SOCKET target, Outbound<SOCKET> outb
this.outbound = outbound;
}

@Override
public void replicate( ReplicatedContent content ) throws ReplicationFailedException
public void replicate( ReplicatedContent content )
{
outbound.send( target, new RaftMessages.NewEntry.Request<>( source, content ) );
}
Expand Down

0 comments on commit f5ed09b

Please sign in to comment.