Skip to content

Commit

Permalink
core-edge: fix concurrency issues for exposed raft state
Browse files Browse the repository at this point in the history
MembershipWaiter::Evaluator now calls synchronized state() on the
RaftMachine every round to fetch a fresh view. The returned state
is an immutable copy with a narrower interface.

The immutable sets in the membership manager are now also wrapped
to be unmodifiable, which was already the intended behaviour.
  • Loading branch information
martinfurmanski committed Oct 5, 2016
1 parent ee585d5 commit 4931258
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 31 deletions.
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.coreedge.core.consensus.roles.Role;
import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.coreedge.core.consensus.shipping.RaftLogShippingManager;
import org.neo4j.coreedge.core.consensus.state.ExposedRaftState;
import org.neo4j.coreedge.core.consensus.state.RaftState;
import org.neo4j.coreedge.core.consensus.state.ReadableRaftState;
import org.neo4j.coreedge.core.consensus.term.TermState;
Expand Down Expand Up @@ -163,7 +164,7 @@ public synchronized RaftCoreState coreState()
return new RaftCoreState( membershipManager.getCommitted() );
}

public void installCoreState( RaftCoreState coreState ) throws IOException
public synchronized void installCoreState( RaftCoreState coreState ) throws IOException
{
membershipManager.install( coreState.committed() );
}
Expand Down Expand Up @@ -204,7 +205,7 @@ public synchronized void bootstrapWithInitialMembers( RaftGroup memberSet ) thro
}
}

public void setTargetMembershipSet( Set<MemberId> targetMembers )
public synchronized void setTargetMembershipSet( Set<MemberId> targetMembers )
{
membershipManager.setTargetMembershipSet( targetMembers );

Expand Down Expand Up @@ -255,9 +256,14 @@ public synchronized void unregisterListener( Listener listener )
leaderListeners.remove( listener );
}

public ReadableRaftState state()
/**
* Every call to state() gives you an immutable copy of the current state.
*
* @return A fresh view of the state.
*/
public synchronized ExposedRaftState state()
{
return state;
return state.copy();
}

private void notifyLeaderChanges( Outcome outcome )
Expand Down
Expand Up @@ -23,15 +23,15 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.neo4j.coreedge.core.consensus.state.ReadableRaftState;
import org.neo4j.coreedge.core.consensus.RaftMachine;
import org.neo4j.coreedge.core.consensus.state.ExposedRaftState;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;

/**
Expand Down Expand Up @@ -67,11 +67,11 @@ public MembershipWaiter( MemberId myself, JobScheduler jobScheduler, Supplier<Da
this.log = logProvider.getLog( getClass() );
}

CompletableFuture<Boolean> waitUntilCaughtUpMember( ReadableRaftState raftState )
CompletableFuture<Boolean> waitUntilCaughtUpMember( RaftMachine raft )
{
CompletableFuture<Boolean> catchUpFuture = new CompletableFuture<>();

Evaluator evaluator = new Evaluator( raftState, catchUpFuture, dbHealthSupplier );
Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier );

JobScheduler.JobHandle jobHandle = jobScheduler.scheduleRecurring(
new JobScheduler.Group( getClass().toString(), POOLED ),
Expand All @@ -84,18 +84,18 @@ CompletableFuture<Boolean> waitUntilCaughtUpMember( ReadableRaftState raftState

private class Evaluator implements Runnable
{
private final ReadableRaftState raftState;
private final RaftMachine raft;
private final CompletableFuture<Boolean> catchUpFuture;

private long lastLeaderCommit;
private final Supplier<DatabaseHealth> dbHealthSupplier;

private Evaluator( ReadableRaftState raftState, CompletableFuture<Boolean> catchUpFuture,
private Evaluator( RaftMachine raft, CompletableFuture<Boolean> catchUpFuture,
Supplier<DatabaseHealth> dbHealthSupplier )
{
this.raftState = raftState;
this.raft = raft;
this.catchUpFuture = catchUpFuture;
this.lastLeaderCommit = raftState.leaderCommit();
this.lastLeaderCommit = raft.state().leaderCommit();
this.dbHealthSupplier = dbHealthSupplier;
}

Expand All @@ -113,7 +113,7 @@ else if ( iAmAVotingMember() && caughtUpWithLeader() )

private boolean iAmAVotingMember()
{
Set votingMembers = raftState.votingMembers();
Set votingMembers = raft.state().votingMembers();
boolean votingMember = votingMembers.contains( myself );
if ( !votingMember )
{
Expand All @@ -126,12 +126,14 @@ private boolean caughtUpWithLeader()
{
boolean caughtUpWithLeader = false;

long localCommit = raftState.commitIndex();
ExposedRaftState state = raft.state();

long localCommit = state.commitIndex();
if ( lastLeaderCommit != -1 )
{
caughtUpWithLeader = localCommit >= lastLeaderCommit;
}
lastLeaderCommit = raftState.leaderCommit();
lastLeaderCommit = state.leaderCommit();
if ( lastLeaderCommit != -1 )
{
long gap = lastLeaderCommit - localCommit;
Expand Down
Expand Up @@ -50,7 +50,7 @@ public MembershipWaiterLifecycle( MembershipWaiter membershipWaiter, Long joinCa
@Override
public void start() throws Throwable
{
CompletableFuture<Boolean> caughtUp = membershipWaiter.waitUntilCaughtUpMember( raft.state() );
CompletableFuture<Boolean> caughtUp = membershipWaiter.waitUntilCaughtUpMember( raft );

try
{
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -65,8 +66,8 @@ public class RaftMembershipManager extends LifecycleAdapter implements RaftMembe

private final int expectedClusterSize;

private volatile Set<MemberId> votingMembers = new HashSet<>();
private volatile Set<MemberId> replicationMembers = new HashSet<>(); // votingMembers + additionalReplicationMembers
private volatile Set<MemberId> votingMembers = Collections.unmodifiableSet( new HashSet<>() );
private volatile Set<MemberId> replicationMembers = Collections.unmodifiableSet( new HashSet<>() ); // votingMembers + additionalReplicationMembers

private Set<Listener> listeners = new HashSet<>();
private Set<MemberId> additionalReplicationMembers = new HashSet<>();
Expand Down Expand Up @@ -137,12 +138,12 @@ private Set<MemberId> missingMembers()
*/
private void updateMemberSets()
{
votingMembers = state.getLatest();
votingMembers = Collections.unmodifiableSet( state.getLatest() );

HashSet<MemberId> newReplicationMembers = new HashSet<>( votingMembers );
newReplicationMembers.addAll( additionalReplicationMembers );

replicationMembers = newReplicationMembers;
replicationMembers = Collections.unmodifiableSet( newReplicationMembers );
listeners.forEach( Listener::onMembershipChanged );
}

Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.core.consensus.state;

import java.util.Set;

import org.neo4j.coreedge.identity.MemberId;

public interface ExposedRaftState
{
long leaderCommit();

long commitIndex();

long appendIndex();

long term();

Set<MemberId> votingMembers();
}
Expand Up @@ -207,4 +207,47 @@ private void logIfLeaderChanged( MemberId leader )
log.info( "Leader changed from %s to %s", this.leader, leader );
}
}

public ExposedRaftState copy()
{
return new ExposedRaftState()
{
final long leaderCommit = RaftState.this.leaderCommit();
final long commitIndex = RaftState.this.commitIndex();
final long appendIndex = RaftState.this.entryLog().appendIndex();
final long term = RaftState.this.term();

final Set<MemberId> votingMembers = RaftState.this.votingMembers(); // returned set is never mutated

@Override
public long leaderCommit()
{
return this.leaderCommit;
}

@Override
public long commitIndex()
{
return this.commitIndex;
}

@Override
public long appendIndex()
{
return this.appendIndex;
}

@Override
public long term()
{
return this.term;
}

@Override
public Set<MemberId> votingMembers()
{
return this.votingMembers;
}
};
}
}
Expand Up @@ -176,7 +176,7 @@ public void start() throws Throwable

private boolean haveState()
{
return raftMachine.state().entryLog().appendIndex() > -1;
return raftMachine.state().appendIndex() > -1;
}

@Override
Expand Down
Expand Up @@ -25,9 +25,10 @@
import org.junit.Before;
import org.junit.Test;

import org.neo4j.coreedge.core.consensus.RaftMachine;
import org.neo4j.coreedge.core.consensus.log.InMemoryRaftLog;
import org.neo4j.coreedge.core.consensus.log.RaftLogEntry;
import org.neo4j.coreedge.core.consensus.state.RaftState;
import org.neo4j.coreedge.core.consensus.state.ExposedRaftState;
import org.neo4j.coreedge.core.consensus.state.RaftStateBuilder;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.logging.NullLogProvider;
Expand Down Expand Up @@ -62,14 +63,17 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception

InMemoryRaftLog raftLog = new InMemoryRaftLog();
raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
RaftState raftState = RaftStateBuilder.raftState()
ExposedRaftState raftState = RaftStateBuilder.raftState()
.votingMembers( member( 0 ) )
.leaderCommit( 0 )
.entryLog( raftLog )
.commitIndex( 0L )
.build();
.build().copy();

CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raftState );
RaftMachine raft = mock( RaftMachine.class );
when( raft.state() ).thenReturn( raftState );

CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raft );
jobScheduler.runJob();

future.get( 0, NANOSECONDS );
Expand All @@ -82,12 +86,15 @@ public void shouldTimeoutIfCaughtUpButNotMember() throws Exception
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1,
NullLogProvider.getInstance() );

RaftState raftState = RaftStateBuilder.raftState()
ExposedRaftState raftState = RaftStateBuilder.raftState()
.votingMembers( member( 1 ) )
.leaderCommit( 0 )
.build();
.build().copy();

RaftMachine raft = mock( RaftMachine.class );
when( raft.state() ).thenReturn( raftState );

CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raftState );
CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raft );
jobScheduler.runJob();
jobScheduler.runJob();

Expand All @@ -109,12 +116,15 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1,
NullLogProvider.getInstance() );

RaftState raftState = RaftStateBuilder.raftState()
ExposedRaftState raftState = RaftStateBuilder.raftState()
.votingMembers( member( 0 ), member( 1 ) )
.leaderCommit( 0 )
.build();
.build().copy();

RaftMachine raft = mock( RaftMachine.class );
when( raft.state() ).thenReturn( raftState );

CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raftState );
CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raft );
jobScheduler.runJob();
jobScheduler.runJob();

Expand Down

0 comments on commit 4931258

Please sign in to comment.