Skip to content

Commit

Permalink
Id Allocation and Raft Membership state are stored on disk so that th…
Browse files Browse the repository at this point in the history
…ey can be retrieved at recovery time.

This is necessary to allow us to (eventually) prune the Raft log.
  • Loading branch information
jimwebber committed Jan 8, 2016
1 parent d8fd792 commit 7de91ec
Show file tree
Hide file tree
Showing 102 changed files with 2,408 additions and 998 deletions.
Expand Up @@ -28,7 +28,6 @@
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;

import static org.neo4j.coreedge.server.AdvertisedSocketAddress.address;
import static org.neo4j.coreedge.discovery.HazelcastServerLifecycle.TRANSACTION_SERVER;
import static org.neo4j.coreedge.discovery.HazelcastServerLifecycle.RAFT_SERVER;

Expand Down Expand Up @@ -68,8 +67,8 @@ private Set<CoreMember> toCoreMembers( Set<Member> members )
for ( Member member : members )
{
coreMembers.add( new CoreMember(
address( member.getStringAttribute( TRANSACTION_SERVER ) ),
address( member.getStringAttribute( RAFT_SERVER ) )
new AdvertisedSocketAddress( member.getStringAttribute( TRANSACTION_SERVER ) ),
new AdvertisedSocketAddress( member.getStringAttribute( RAFT_SERVER ) )
));
}

Expand All @@ -86,6 +85,6 @@ public int getNumberOfEdgeServers()
public AdvertisedSocketAddress firstTransactionServer()
{
Member member = hazelcast.getCluster().getMembers().iterator().next();
return address( member.getStringAttribute( TRANSACTION_SERVER ) );
return new AdvertisedSocketAddress( member.getStringAttribute( TRANSACTION_SERVER ) );
}
}
Expand Up @@ -23,7 +23,7 @@
import java.util.Set;
import java.util.TreeMap;

import org.neo4j.coreedge.raft.state.FollowerStates;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;

public class Followers
{
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.Serializable;
import java.util.Set;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;

import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
Expand All @@ -35,10 +36,10 @@
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.RaftState;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.raft.state.TermStore;
import org.neo4j.coreedge.raft.state.VoteStore;
import org.neo4j.coreedge.server.core.RaftStorageExceptionHandler;
import org.neo4j.coreedge.raft.state.term.TermStore;
import org.neo4j.coreedge.raft.state.vote.VoteStore;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

Expand Down Expand Up @@ -86,7 +87,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private final long electionTimeout;
private final long leaderWaitTimeout;

private final RaftStorageExceptionHandler raftStorageExceptionHandler;
private final Supplier<DatabaseHealth> databaseHealthSupplier;
private Clock clock;

private final Outbound<MEMBER> outbound;
Expand All @@ -101,7 +102,7 @@ public RaftInstance( MEMBER myself, TermStore termStore, VoteStore<MEMBER> voteS
final Inbound inbound, final Outbound<MEMBER> outbound, long leaderWaitTimeout,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping,
RaftStorageExceptionHandler raftStorageExceptionHandler,
Supplier<DatabaseHealth> databaseHealthSupplier,
Clock clock )

{
Expand All @@ -115,7 +116,7 @@ public RaftInstance( MEMBER myself, TermStore termStore, VoteStore<MEMBER> voteS
this.leaderWaitTimeout = leaderWaitTimeout;
this.outbound = outbound;
this.logShipping = logShipping;
this.raftStorageExceptionHandler = raftStorageExceptionHandler;
this.databaseHealthSupplier = databaseHealthSupplier;
this.clock = clock;
this.log = logProvider.getLog( getClass() );

Expand Down Expand Up @@ -164,7 +165,7 @@ public synchronized void bootstrapWithInitialMembers( RaftGroup<MEMBER> memberSe
}
catch ( RaftStorageException e )
{
raftStorageExceptionHandler.panic( e );
databaseHealthSupplier.get().panic( e );
throw new BootstrapException( e );
}
}
Expand Down Expand Up @@ -261,7 +262,7 @@ public synchronized void handle( Serializable incomingMessage )
catch ( RaftStorageException e )
{
log.error( "Failed to process RAFT message " + incomingMessage, e );
raftStorageExceptionHandler.panic( e );
databaseHealthSupplier.get().panic( e );
}
finally
{
Expand Down
Expand Up @@ -19,20 +19,23 @@
*/
package org.neo4j.coreedge.raft;

import java.util.function.Supplier;

import org.neo4j.coreedge.raft.log.InMemoryRaftLog;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.membership.RaftGroup;
import org.neo4j.coreedge.raft.membership.RaftMembershipManager;
import org.neo4j.coreedge.raft.net.Inbound;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.replication.LocalReplicator;
import org.neo4j.coreedge.raft.replication.LeaderOnlyReplicator;
import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager;
import org.neo4j.coreedge.raft.state.InMemoryTermStore;
import org.neo4j.coreedge.raft.state.InMemoryVoteStore;
import org.neo4j.coreedge.raft.state.TermStore;
import org.neo4j.coreedge.raft.state.VoteStore;
import org.neo4j.coreedge.server.core.RaftStorageExceptionHandler;
import org.neo4j.coreedge.raft.state.membership.InMemoryRaftMembershipState;
import org.neo4j.coreedge.raft.state.term.InMemoryTermStore;
import org.neo4j.coreedge.raft.state.term.TermStore;
import org.neo4j.coreedge.raft.state.vote.InMemoryVoteStore;
import org.neo4j.coreedge.raft.state.vote.VoteStore;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;

Expand All @@ -46,10 +49,13 @@ public class RaftInstanceBuilder<MEMBER>
private TermStore termStore = new InMemoryTermStore();
private VoteStore<MEMBER> voteStore = new InMemoryVoteStore<>();
private RaftLog raftLog = new InMemoryRaftLog();
private RenewableTimeoutService renewableTimeoutService = new DelayedRenewableTimeoutService( Clock.SYSTEM_CLOCK, NullLogProvider.getInstance() );
private RenewableTimeoutService renewableTimeoutService = new DelayedRenewableTimeoutService( Clock.SYSTEM_CLOCK,
NullLogProvider.getInstance() );

private Inbound inbound = handler -> {};
private Outbound<MEMBER> outbound = ( advertisedSocketAddress, messages ) -> {};
private Inbound inbound = handler -> {
};
private Outbound<MEMBER> outbound = ( advertisedSocketAddress, messages ) -> {
};

private LogProvider logProvider = NullLogProvider.getInstance();
private Clock clock = Clock.SYSTEM_CLOCK;
Expand All @@ -58,10 +64,11 @@ public class RaftInstanceBuilder<MEMBER>
private long heartbeatInterval = 150;
private long leaderWaitTimeout = 10000;
private long catchupTimeout = 30000;
private long retryTimeMillis = electionTimeout/2;
private long retryTimeMillis = electionTimeout / 2;
private int catchupBatchSize = 64;
private int maxAllowedShippingLag = 256;
private RaftStorageExceptionHandler raftStorageExceptionHandler;
private Supplier<DatabaseHealth> databaseHealthSupplier;
private InMemoryRaftMembershipState<MEMBER> raftMembership = new InMemoryRaftMembershipState<>();

public RaftInstanceBuilder( MEMBER member, int expectedClusterSize, RaftGroup.Builder<MEMBER> memberSetBuilder )
{
Expand All @@ -72,15 +79,17 @@ public RaftInstanceBuilder( MEMBER member, int expectedClusterSize, RaftGroup.Bu

public RaftInstance<MEMBER> build()
{
LocalReplicator<MEMBER, MEMBER> localReplicator = new LocalReplicator<>( member, member, outbound );
RaftMembershipManager<MEMBER> membershipManager = new RaftMembershipManager<>( localReplicator,
memberSetBuilder, raftLog, logProvider, expectedClusterSize, electionTimeout, clock, catchupTimeout );
LeaderOnlyReplicator<MEMBER, MEMBER> leaderOnlyReplicator = new LeaderOnlyReplicator<>( member, member,
outbound );
RaftMembershipManager<MEMBER> membershipManager = new RaftMembershipManager<>( leaderOnlyReplicator,
memberSetBuilder, raftLog, logProvider, expectedClusterSize, electionTimeout, clock, catchupTimeout,
raftMembership );
RaftLogShippingManager<MEMBER> logShipping = new RaftLogShippingManager<>( outbound, logProvider, raftLog,
clock, member, membershipManager, retryTimeMillis, catchupBatchSize, maxAllowedShippingLag );

return new RaftInstance<>( member, termStore, voteStore, raftLog, electionTimeout, heartbeatInterval,
renewableTimeoutService, inbound, outbound, leaderWaitTimeout, logProvider, membershipManager, logShipping,
raftStorageExceptionHandler, clock );
renewableTimeoutService, inbound, outbound, leaderWaitTimeout, logProvider, membershipManager,
logShipping, databaseHealthSupplier, clock );
}

public RaftInstanceBuilder<MEMBER> leaderWaitTimeout( long leaderWaitTimeout )
Expand Down Expand Up @@ -113,13 +122,13 @@ public RaftInstanceBuilder<MEMBER> raftLog( RaftLog raftLog )
return this;
}

public RaftInstanceBuilder<MEMBER> raftStorageExceptionHandler( RaftStorageExceptionHandler raftStorageExceptionHandler)
public RaftInstanceBuilder<MEMBER> databaseHealth( final DatabaseHealth databaseHealth)
{
this.raftStorageExceptionHandler = raftStorageExceptionHandler;
this.databaseHealthSupplier = () -> databaseHealth;
return this;
}

public RaftInstanceBuilder<MEMBER> clock(Clock clock)
public RaftInstanceBuilder<MEMBER> clock( Clock clock )
{
this.clock = clock;
return this;
Expand Down
Expand Up @@ -40,21 +40,21 @@ public class InMemoryRaftLog implements RaftLog
public void replay() throws Throwable
{
int index = 0;
for ( ; index <= commitIndex; index++ )
for (; index <= commitIndex; index++ )
{
ReplicatedContent content = readEntryContent( index );
for ( Listener listener : listeners )
{
listener.onAppended( content );
listener.onAppended( content, index );
listener.onCommitted( content, index );
}
}
for ( ; index <= appendIndex; index++ )
for (; index <= appendIndex; index++ )
{
ReplicatedContent content = readEntryContent( index );
for ( Listener listener : listeners )
{
listener.onAppended( content );
listener.onAppended( content, index );
}
}
}
Expand All @@ -79,11 +79,12 @@ public long append( RaftLogEntry logEntry ) throws RaftStorageException
logEntry.term(), logEntry.toString(), term ) );
}

appendIndex++;
for ( Listener listener : listeners )
{
listener.onAppended( logEntry.content() );
listener.onAppended( logEntry.content(), appendIndex );
}
raftLog.put( ++appendIndex, logEntry );
raftLog.put( appendIndex, logEntry );
return appendIndex;
}

Expand Down
Expand Up @@ -27,8 +27,8 @@

import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor;
import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.MarshallingException;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.Serializer;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
Expand All @@ -37,15 +37,15 @@

/**
* Writes a raft log to disk using 3 files:
* <p/>
* <p>
* 1. entries.log
* ┌─────────────────────────────┐
* │term 8 bytes│
* │contentPointer 8 bytes│
* ├─────────────────────────────┤
* │record length 16 bytes│
* └─────────────────────────────┘
* <p/>
* <p>
* 2. content.log
* ┌─────────────────────────────┐
* │contentLength 4 bytes│
Expand All @@ -55,7 +55,7 @@
* ├─────────────────────────────┤
* │record length variable│
* └─────────────────────────────┘
* <p/>
* <p>
* 3. commit.log
* ┌─────────────────────────────┐
* │committedIndex 8 bytes│
Expand Down Expand Up @@ -88,8 +88,10 @@ public NaiveDurableRaftLog( FileSystemAbstraction fileSystem, File directory, Se
Monitors monitors )
{
this.serializer = serializer;
this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass(), RaftLog.APPEND_INDEX_TAG );
this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass(), RaftLog.COMMIT_INDEX_TAG );
this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass(), RaftLog
.APPEND_INDEX_TAG );
this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass(), RaftLog
.COMMIT_INDEX_TAG );

try
{
Expand All @@ -109,9 +111,10 @@ public NaiveDurableRaftLog( FileSystemAbstraction fileSystem, File directory, Se
@Override
public void shutdown() throws Throwable
{
Exception container = new Exception("Exception happened during shutdown of RaftLog. See suppressed exceptions for details");
Exception container = new Exception( "Exception happened during shutdown of RaftLog. See suppressed " +
"exceptions for details" );
boolean shouldThrow = false;
shouldThrow = forceAndCloseChannel( entriesChannel, container ) || shouldThrow;
shouldThrow = forceAndCloseChannel( entriesChannel, container );
shouldThrow = forceAndCloseChannel( contentChannel, container ) || shouldThrow;
shouldThrow = forceAndCloseChannel( commitChannel, container ) || shouldThrow;
if ( shouldThrow )
Expand All @@ -123,7 +126,8 @@ public void shutdown() throws Throwable
/**
* This method will try to force and close a store channel. If any of these two operations fails, the exception
* will be added as suppressed in the provided container. In such a case, true will be returned.
* @param channel The channel to close
*
* @param channel The channel to close
* @param container The container to add supressed exceptions in the case of failure
* @return True iff an exception was thrown by either force() or close()
*/
Expand All @@ -135,7 +139,7 @@ private boolean forceAndCloseChannel( StoreChannel channel, Exception container
channel.force( false );
channel.close();
}
catch( Exception e )
catch ( Exception e )
{
exceptionHappened = true;
container.addSuppressed( e );
Expand All @@ -147,21 +151,21 @@ private boolean forceAndCloseChannel( StoreChannel channel, Exception container
public void replay() throws Throwable
{
int index = 0;
for ( ; index <= commitIndex; index++ )
for (; index <= commitIndex; index++ )
{
ReplicatedContent content = readEntryContent( index );
for ( Listener listener : listeners )
{
listener.onAppended( content );
listener.onAppended( content, index );
listener.onCommitted( content, index );
}
}
for ( ; index <= appendIndex; index++ )
for (; index <= appendIndex; index++ )
{
ReplicatedContent content = readEntryContent( index );
for ( Listener listener : listeners )
{
listener.onAppended( content );
listener.onAppended( content, index );
}
}
}
Expand Down Expand Up @@ -190,11 +194,12 @@ public long append( RaftLogEntry logEntry ) throws RaftStorageException
int length = writeContent( logEntry );
writeEntry( new Entry( logEntry.term(), contentOffset ) );
contentOffset += length;
appendIndex++;
for ( Listener listener : listeners )
{
listener.onAppended( logEntry.content() );
listener.onAppended( logEntry.content(), appendIndex );
}
return ++appendIndex;
return appendIndex;
}
catch ( MarshallingException | IOException e )
{
Expand Down Expand Up @@ -229,7 +234,7 @@ public void truncate( long fromIndex ) throws RaftStorageException
listener.onTruncated( fromIndex );
}
}
term = readEntryTerm( appendIndex ) ;
term = readEntryTerm( appendIndex );
}
catch ( IOException e )
{
Expand Down Expand Up @@ -273,14 +278,14 @@ public void commit( final long newCommitIndex ) throws RaftStorageException
@Override
public long appendIndex()
{
appendIndexMonitor.appendIndex(appendIndex);
appendIndexMonitor.appendIndex( appendIndex );
return appendIndex;
}

@Override
public long commitIndex()
{
commitIndexMonitor.commitIndex(commitIndex);
commitIndexMonitor.commitIndex( commitIndex );
return commitIndex;
}

Expand Down

0 comments on commit 7de91ec

Please sign in to comment.