Skip to content

Commit

Permalink
Merge pull request #7143 from jimwebber/3.1-clocks-update
Browse files Browse the repository at this point in the history
Core-edge is updated to the new Java 8 clocks and associated fakes.
  • Loading branch information
mneedham committed May 17, 2016
2 parents 00ce89a + 92deea9 commit 1686964
Show file tree
Hide file tree
Showing 18 changed files with 177 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.coreedge.raft;

import java.time.Clock;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
Expand All @@ -29,7 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.helpers.Clock;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -50,8 +50,8 @@
*/
public class DelayedRenewableTimeoutService extends LifecycleAdapter implements Runnable, RenewableTimeoutService
{
public static final int TIMER_RESOLUTION = 1;
public static final TimeUnit TIMER_RESOLUTION_UNIT = TimeUnit.MILLISECONDS;
private static final int TIMER_RESOLUTION = 1;
private static final TimeUnit TIMER_RESOLUTION_UNIT = TimeUnit.MILLISECONDS;

/**
* Sorted by next-to-trigger.
Expand Down Expand Up @@ -96,12 +96,12 @@ public RenewableTimeout create( TimeoutName name, long delayInMillis, long rando
return timeout;
}

public void renew( ScheduledRenewableTimeout timeout )
private void renew( ScheduledRenewableTimeout timeout )
{
pendingRenewals.offer( timeout );
}

public void cancel( ScheduledRenewableTimeout timeout )
private void cancel( ScheduledRenewableTimeout timeout )
{
synchronized ( timeouts )
{
Expand All @@ -112,15 +112,15 @@ public void cancel( ScheduledRenewableTimeout timeout )
private long calcTimeoutTimestamp( long milliseconds, long randomRange )
{
int randomness = randomRange != 0 ? random.nextInt( (int) randomRange ) : 0;
return clock.currentTimeMillis() + milliseconds + randomness;
return clock.millis() + milliseconds + randomness;
}

@Override
public synchronized void run()
{
try
{
long now = clock.currentTimeMillis();
long now = clock.millis();
Collection<ScheduledRenewableTimeout> triggered = new LinkedList<>();

synchronized ( timeouts )
Expand Down Expand Up @@ -187,7 +187,7 @@ public void stop() throws Throwable
scheduler.shutdown();
}

public static class ScheduledRenewableTimeout implements RenewableTimeout, Comparable<ScheduledRenewableTimeout>
static class ScheduledRenewableTimeout implements RenewableTimeout, Comparable<ScheduledRenewableTimeout>
{
private static final AtomicLong idGen = new AtomicLong();
private final long id = idGen.getAndIncrement();
Expand All @@ -197,7 +197,7 @@ public static class ScheduledRenewableTimeout implements RenewableTimeout, Compa
private final DelayedRenewableTimeoutService timeouts;
private long timeoutTimestampMillis;

public ScheduledRenewableTimeout( long timeoutTimestampMillis, long timeoutLength, long randomRange, TimeoutHandler
ScheduledRenewableTimeout( long timeoutTimestampMillis, long timeoutLength, long randomRange, TimeoutHandler
handler, DelayedRenewableTimeoutService timeouts )
{
this.timeoutTimestampMillis = timeoutTimestampMillis;
Expand Down Expand Up @@ -229,22 +229,22 @@ public void cancel()
timeouts.cancel( this );
}

public void setTimeoutTimestamp( long newTimestamp )
void setTimeoutTimestamp( long newTimestamp )
{
this.timeoutTimestampMillis = newTimestamp;
}

@Override
public int compareTo( ScheduledRenewableTimeout o )
public int compareTo( ScheduledRenewableTimeout renewableTimeout )
{
if ( timeoutTimestampMillis == o.timeoutTimestampMillis )
if ( timeoutTimestampMillis == renewableTimeout.timeoutTimestampMillis )
{
// Timeouts are set to trigger at the same time.
// Order them by id instead.
return (int) (id - o.id);
return (int) (id - renewableTimeout.id);
}

return (int) (timeoutTimestampMillis - o.timeoutTimestampMillis);
return (int) (timeoutTimestampMillis - renewableTimeout.timeoutTimestampMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
*/
package org.neo4j.coreedge.raft.membership;

import java.time.Clock;

import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.state.follower.FollowerState;
import org.neo4j.helpers.Clock;

public class CatchupGoal
class CatchupGoal
{
private static final long MAX_ROUNDS = 10;

Expand All @@ -35,13 +36,13 @@ public class CatchupGoal
private long roundCount;
private long startTime;

public CatchupGoal( ReadableRaftLog raftLog, Clock clock, long electionTimeout )
CatchupGoal( ReadableRaftLog raftLog, Clock clock, long electionTimeout )
{
this.raftLog = raftLog;
this.clock = clock;
this.electionTimeout = electionTimeout;
this.targetIndex = raftLog.appendIndex();
this.startTime = clock.currentTimeMillis();
this.startTime = clock.millis();

this.roundCount = 1;
}
Expand All @@ -50,14 +51,14 @@ boolean achieved( FollowerState followerState )
{
if ( followerState.getMatchIndex() >= targetIndex )
{
if ( (clock.currentTimeMillis() - startTime) <= electionTimeout )
if ( (clock.millis() - startTime) <= electionTimeout )
{
return true;
}
else if ( roundCount < MAX_ROUNDS )
{
roundCount++;
startTime = clock.currentTimeMillis();
startTime = clock.millis();
targetIndex = raftLog.appendIndex();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
*/
package org.neo4j.coreedge.raft.membership;

import java.time.Clock;

import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.state.follower.FollowerState;
import org.neo4j.helpers.Clock;

public class CatchupGoalTracker
class CatchupGoalTracker
{
public static final long MAX_ROUNDS = 10;
static final long MAX_ROUNDS = 10;

private final ReadableRaftLog raftLog;
private final Clock clock;
Expand All @@ -40,15 +41,15 @@ public class CatchupGoalTracker
private boolean finished;
private boolean goalAchieved;

public CatchupGoalTracker( ReadableRaftLog raftLog, Clock clock, long roundTimeout, long catchupTimeout )
CatchupGoalTracker( ReadableRaftLog raftLog, Clock clock, long roundTimeout, long catchupTimeout )
{
this.raftLog = raftLog;
this.clock = clock;
this.roundTimeout = roundTimeout;
this.catchupTimeout = catchupTimeout;
this.targetIndex = raftLog.appendIndex();
this.startTime = clock.currentTimeMillis();
this.roundStartTime = clock.currentTimeMillis();
this.startTime = clock.millis();
this.roundStartTime = clock.millis();

this.roundCount = 1;
}
Expand All @@ -61,12 +62,12 @@ void updateProgress( FollowerState followerState )
}

boolean achievedTarget = followerState.getMatchIndex() >= targetIndex;
if ( achievedTarget && (clock.currentTimeMillis() - roundStartTime) <= roundTimeout )
if ( achievedTarget && (clock.millis() - roundStartTime) <= roundTimeout )
{
goalAchieved = true;
finished = true;
}
else if ( clock.currentTimeMillis() > (startTime + catchupTimeout) )
else if ( clock.millis() > (startTime + catchupTimeout) )
{
finished = true;
}
Expand All @@ -75,7 +76,7 @@ else if ( achievedTarget )
if( roundCount < MAX_ROUNDS )
{
roundCount++;
roundStartTime = clock.currentTimeMillis();
roundStartTime = clock.millis();
targetIndex = raftLog.appendIndex();
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.coreedge.raft.membership;

import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -39,7 +40,6 @@
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.helpers.Clock;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
*/
package org.neo4j.coreedge.raft.membership;

import java.time.Clock;
import java.util.HashSet;
import java.util.Set;

import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.helpers.Clock;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

Expand Down Expand Up @@ -58,7 +58,7 @@
*
* Only a single member change is handled at a time.
*/
public class RaftMembershipStateMachine<MEMBER>
class RaftMembershipStateMachine<MEMBER>
{
private final Log log;
public RaftMembershipStateMachineEventHandler<MEMBER> state = new Inactive();
Expand All @@ -73,9 +73,9 @@ public class RaftMembershipStateMachine<MEMBER>

private MEMBER catchingUpMember;

public RaftMembershipStateMachine( ReadableRaftLog raftLog, Clock clock, long electionTimeout,
MembershipDriver<MEMBER> membershipDriver, LogProvider logProvider,
long catchupTimeout, RaftMembershipState<MEMBER> membershipState )
RaftMembershipStateMachine( ReadableRaftLog raftLog, Clock clock, long electionTimeout,
MembershipDriver<MEMBER> membershipDriver, LogProvider logProvider,
long catchupTimeout, RaftMembershipState<MEMBER> membershipState )
{
this.raftLog = raftLog;
this.clock = clock;
Expand All @@ -101,37 +101,37 @@ private synchronized void handleState( RaftMembershipStateMachineEventHandler<ME
}
}

public void onRole( Role role )
void onRole( Role role )
{
handleState( state.onRole( role ) );
}

public void onRaftGroupCommitted()
void onRaftGroupCommitted()
{
handleState( state.onRaftGroupCommitted() );
}

public void onFollowerStateChange( FollowerStates<MEMBER> followerStates )
void onFollowerStateChange( FollowerStates<MEMBER> followerStates )
{
handleState( state.onFollowerStateChange( followerStates ) );
}

public void onMissingMember( MEMBER member )
void onMissingMember( MEMBER member )
{
handleState( state.onMissingMember( member ) );
}

public void onSuperfluousMember( MEMBER member )
void onSuperfluousMember( MEMBER member )
{
handleState( state.onSuperfluousMember( member ) );
}

public void onTargetChanged( Set<MEMBER> targetMembers )
void onTargetChanged( Set<MEMBER> targetMembers )
{
handleState( state.onTargetChanged( targetMembers ) );
}

public class Inactive extends RaftMembershipStateMachineEventHandler.Adapter<MEMBER>
private class Inactive extends RaftMembershipStateMachineEventHandler.Adapter<MEMBER>
{
@Override
public RaftMembershipStateMachineEventHandler<MEMBER> onRole( Role role )
Expand All @@ -157,7 +157,7 @@ public String toString()
}
}

public abstract class ActiveBaseState extends RaftMembershipStateMachineEventHandler.Adapter<MEMBER>
abstract class ActiveBaseState extends RaftMembershipStateMachineEventHandler.Adapter<MEMBER>
{
@Override
public RaftMembershipStateMachineEventHandler<MEMBER> onRole( Role role )
Expand All @@ -173,7 +173,7 @@ public RaftMembershipStateMachineEventHandler<MEMBER> onRole( Role role )
}
}

public class Idle extends ActiveBaseState
private class Idle extends ActiveBaseState
{
@Override
public RaftMembershipStateMachineEventHandler<MEMBER> onMissingMember( MEMBER member )
Expand All @@ -198,12 +198,12 @@ public String toString()
}
}

public class CatchingUp extends ActiveBaseState
private class CatchingUp extends ActiveBaseState
{
private final CatchupGoalTracker catchupGoalTracker;
boolean movingToConsensus;

public CatchingUp( MEMBER member )
CatchingUp( MEMBER member )
{
this.catchupGoalTracker = new CatchupGoalTracker( raftLog, clock, electionTimeout, catchupTimeout );
catchingUpMember = member;
Expand Down

0 comments on commit 1686964

Please sign in to comment.