Skip to content

Commit

Permalink
Renaming to follow the same 'state' naming convention as the IdAlloca…
Browse files Browse the repository at this point in the history
…tionState and RaftMembershipState.
  • Loading branch information
jimwebber committed Jan 13, 2016
1 parent ce903e8 commit 58cccb3
Show file tree
Hide file tree
Showing 24 changed files with 518 additions and 141 deletions.
Expand Up @@ -36,8 +36,8 @@
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.RaftState;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.raft.state.term.TermStore;
import org.neo4j.coreedge.raft.state.vote.VoteStore;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.logging.Log;
Expand Down Expand Up @@ -97,7 +97,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName

private RaftLogShippingManager<MEMBER> logShipping;

public RaftInstance( MEMBER myself, TermStore termStore, VoteStore<MEMBER> voteStore, RaftLog entryLog,
public RaftInstance( MEMBER myself, TermState termState, VoteState<MEMBER> voteState, RaftLog entryLog,
long electionTimeout, long heartbeatInterval, RenewableTimeoutService renewableTimeoutService,
final Inbound inbound, final Outbound<MEMBER> outbound, long leaderWaitTimeout,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
Expand All @@ -122,7 +122,7 @@ public RaftInstance( MEMBER myself, TermStore termStore, VoteStore<MEMBER> voteS

this.membershipManager = membershipManager;

this.state = new RaftState<>( myself, termStore, membershipManager, entryLog, voteStore );
this.state = new RaftState<>( myself, termState, membershipManager, entryLog, voteState );

initTimers();

Expand Down
Expand Up @@ -30,10 +30,10 @@
import org.neo4j.coreedge.raft.replication.LeaderOnlyReplicator;
import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager;
import org.neo4j.coreedge.raft.state.membership.InMemoryRaftMembershipState;
import org.neo4j.coreedge.raft.state.term.InMemoryTermStore;
import org.neo4j.coreedge.raft.state.term.TermStore;
import org.neo4j.coreedge.raft.state.vote.InMemoryVoteStore;
import org.neo4j.coreedge.raft.state.vote.VoteStore;
import org.neo4j.coreedge.raft.state.term.InMemoryTermState;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.InMemoryVoteState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.logging.LogProvider;
Expand All @@ -46,8 +46,8 @@ public class RaftInstanceBuilder<MEMBER>
private int expectedClusterSize;
private RaftGroup.Builder<MEMBER> memberSetBuilder;

private TermStore termStore = new InMemoryTermStore();
private VoteStore<MEMBER> voteStore = new InMemoryVoteStore<>();
private TermState termState = new InMemoryTermState();
private VoteState<MEMBER> voteState = new InMemoryVoteState<>();
private RaftLog raftLog = new InMemoryRaftLog();
private RenewableTimeoutService renewableTimeoutService = new DelayedRenewableTimeoutService( Clock.SYSTEM_CLOCK,
NullLogProvider.getInstance() );
Expand Down Expand Up @@ -87,7 +87,7 @@ public RaftInstance<MEMBER> build()
RaftLogShippingManager<MEMBER> logShipping = new RaftLogShippingManager<>( outbound, logProvider, raftLog,
clock, member, membershipManager, retryTimeMillis, catchupBatchSize, maxAllowedShippingLag );

return new RaftInstance<>( member, termStore, voteStore, raftLog, electionTimeout, heartbeatInterval,
return new RaftInstance<>( member, termState, voteState, raftLog, electionTimeout, heartbeatInterval,
renewableTimeoutService, inbound, outbound, leaderWaitTimeout, logProvider, membershipManager,
logShipping, databaseHealthSupplier, clock );
}
Expand Down
Expand Up @@ -29,28 +29,28 @@
import org.neo4j.coreedge.raft.outcome.LogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
import org.neo4j.coreedge.raft.state.term.TermStore;
import org.neo4j.coreedge.raft.state.vote.VoteStore;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;

public class RaftState<MEMBER> implements ReadableRaftState<MEMBER>
{
private final MEMBER myself;
private final RaftMembership<MEMBER> membership;
private final TermStore termStore;
private final TermState termState;
private MEMBER leader;
private long leaderCommit = -1;
private final VoteStore<MEMBER> voteStore;
private final VoteState<MEMBER> voteState;
private Set<MEMBER> votesForMe = new HashSet<>();
private long lastLogIndexBeforeWeBecameLeader = -1;
private FollowerStates<MEMBER> followerStates = new FollowerStates<>();
private final RaftLog entryLog;

public RaftState( MEMBER myself, TermStore termStore, RaftMembership<MEMBER> membership,
RaftLog entryLog, VoteStore<MEMBER> voteStore )
public RaftState( MEMBER myself, TermState termState, RaftMembership<MEMBER> membership,
RaftLog entryLog, VoteState<MEMBER> voteState )
{
this.myself = myself;
this.termStore = termStore;
this.voteStore = voteStore;
this.termState = termState;
this.voteState = voteState;
this.membership = membership;
this.entryLog = entryLog;
}
Expand All @@ -76,7 +76,7 @@ public Set<MEMBER> replicationMembers()
@Override
public long term()
{
return termStore.currentTerm();
return termState.currentTerm();
}

@Override
Expand All @@ -94,7 +94,7 @@ public long leaderCommit()
@Override
public MEMBER votedFor()
{
return voteStore.votedFor();
return voteState.votedFor();
}

@Override
Expand Down Expand Up @@ -123,8 +123,8 @@ public ReadableRaftLog entryLog()

public void update( Outcome<MEMBER> outcome ) throws RaftStorageException
{
termStore.update( outcome.getTerm() );
voteStore.update( outcome.getVotedFor() );
termState.update( outcome.getTerm() );
voteState.update( outcome.getVotedFor() );
leader = outcome.getLeader();
leaderCommit = outcome.getLeaderCommit();
votesForMe = outcome.getVotesForMe();
Expand Down
Expand Up @@ -19,9 +19,7 @@
*/
package org.neo4j.coreedge.raft.state.term;

import org.neo4j.coreedge.raft.state.term.TermStore;

public class InMemoryTermStore implements TermStore
public class InMemoryTermState implements TermState
{
private long term = 0;

Expand Down
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.state.term;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

public class OnDiskTermState extends LifecycleAdapter implements TermState
{
public static final int TERM_BYTES = 8;

private final StoreChannel channel;
private long term;

public OnDiskTermState( FileSystemAbstraction fileSystem, File directory )
{
try
{
channel = fileSystem.open( new File( directory, "term.state" ), "rw" );
term = readTerm();
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
}

@Override
public void shutdown() throws Throwable
{
channel.force( false );
channel.close();
}

@Override
public long currentTerm()
{
return term;
}

@Override
public void update( long newTerm ) throws RaftStorageException
{
if ( newTerm < term )
{
throw new IllegalArgumentException( "Cannot move to a lower term" );
}

try
{
ByteBuffer buffer = ByteBuffer.allocate( TERM_BYTES );
buffer.putLong( newTerm );
buffer.flip();

channel.writeAll( buffer, 0 );
channel.force( false );
}
catch ( IOException e )
{
throw new RaftStorageException( "Failed to update term", e );
}
term = newTerm;
}

private long readTerm() throws IOException
{
if ( channel.size() < TERM_BYTES )
{
return 0;
}
ByteBuffer buffer = ByteBuffer.allocate( TERM_BYTES );
channel.read( buffer, 0 );
buffer.flip();
return buffer.getLong();
}
}
Expand Up @@ -21,7 +21,7 @@

import org.neo4j.coreedge.raft.log.RaftStorageException;

public interface TermStore
public interface TermState
{
long currentTerm();

Expand Down
Expand Up @@ -19,9 +19,7 @@
*/
package org.neo4j.coreedge.raft.state.vote;

import org.neo4j.coreedge.raft.state.vote.VoteStore;

public class InMemoryVoteStore<MEMBER> implements VoteStore<MEMBER>
public class InMemoryVoteState<MEMBER> implements VoteState<MEMBER>
{
MEMBER votedFor;

Expand Down
@@ -0,0 +1,107 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.state.vote;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.coreedge.raft.membership.CoreMarshal;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

public class OnDiskVoteState extends LifecycleAdapter implements VoteState<CoreMember>
{
private final StoreChannel channel;
private final CoreMarshal coreMemberMarshal = new CoreMarshal();
private CoreMember votedFor;

public OnDiskVoteState( FileSystemAbstraction fileSystem, File directory )
{
try
{
channel = fileSystem.open( new File( directory, "vote.state" ), "rw" );
votedFor = readVote();
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
}

@Override
public void shutdown() throws Throwable
{
channel.force( false );
channel.close();
}

@Override
public CoreMember votedFor()
{
return votedFor;
}

@Override
public void update( CoreMember votedFor ) throws RaftStorageException
{
try
{
if ( votedFor == null )
{
channel.truncate( 0 );
}
else
{
ByteBuf byteBuf = Unpooled.buffer();
coreMemberMarshal.marshal( votedFor, byteBuf );
ByteBuffer buffer = byteBuf.nioBuffer();

channel.writeAll( buffer, 0 );
}
channel.force( false );
}
catch ( IOException e )
{
throw new RaftStorageException( "Failed to update votedFor", e );
}
this.votedFor = votedFor;
}

private CoreMember readVote() throws IOException
{
if ( channel.size() == 0 )
{
return null;
}

ByteBuffer buffer = ByteBuffer.allocate( (int) channel.size() );
channel.read( buffer, 0 );
buffer.flip();

return coreMemberMarshal.unmarshal( Unpooled.wrappedBuffer( buffer ) );
}
}
Expand Up @@ -21,7 +21,7 @@

import org.neo4j.coreedge.raft.log.RaftStorageException;

public interface VoteStore<MEMBER>
public interface VoteState<MEMBER>
{
MEMBER votedFor();

Expand Down

0 comments on commit 58cccb3

Please sign in to comment.