Skip to content

Commit

Permalink
Core-edge is updated to the new Java 8 clocks and associated fakes.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimwebber committed May 17, 2016
1 parent b72199f commit 92deea9
Show file tree
Hide file tree
Showing 18 changed files with 177 additions and 163 deletions.
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.coreedge.raft;

import java.time.Clock;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
Expand All @@ -29,7 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.helpers.Clock;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -50,8 +50,8 @@
*/
public class DelayedRenewableTimeoutService extends LifecycleAdapter implements Runnable, RenewableTimeoutService
{
public static final int TIMER_RESOLUTION = 1;
public static final TimeUnit TIMER_RESOLUTION_UNIT = TimeUnit.MILLISECONDS;
private static final int TIMER_RESOLUTION = 1;
private static final TimeUnit TIMER_RESOLUTION_UNIT = TimeUnit.MILLISECONDS;

/**
* Sorted by next-to-trigger.
Expand Down Expand Up @@ -96,12 +96,12 @@ public RenewableTimeout create( TimeoutName name, long delayInMillis, long rando
return timeout;
}

public void renew( ScheduledRenewableTimeout timeout )
private void renew( ScheduledRenewableTimeout timeout )
{
pendingRenewals.offer( timeout );
}

public void cancel( ScheduledRenewableTimeout timeout )
private void cancel( ScheduledRenewableTimeout timeout )
{
synchronized ( timeouts )
{
Expand All @@ -112,15 +112,15 @@ public void cancel( ScheduledRenewableTimeout timeout )
private long calcTimeoutTimestamp( long milliseconds, long randomRange )
{
int randomness = randomRange != 0 ? random.nextInt( (int) randomRange ) : 0;
return clock.currentTimeMillis() + milliseconds + randomness;
return clock.millis() + milliseconds + randomness;
}

@Override
public synchronized void run()
{
try
{
long now = clock.currentTimeMillis();
long now = clock.millis();
Collection<ScheduledRenewableTimeout> triggered = new LinkedList<>();

synchronized ( timeouts )
Expand Down Expand Up @@ -187,7 +187,7 @@ public void stop() throws Throwable
scheduler.shutdown();
}

public static class ScheduledRenewableTimeout implements RenewableTimeout, Comparable<ScheduledRenewableTimeout>
static class ScheduledRenewableTimeout implements RenewableTimeout, Comparable<ScheduledRenewableTimeout>
{
private static final AtomicLong idGen = new AtomicLong();
private final long id = idGen.getAndIncrement();
Expand All @@ -197,7 +197,7 @@ public static class ScheduledRenewableTimeout implements RenewableTimeout, Compa
private final DelayedRenewableTimeoutService timeouts;
private long timeoutTimestampMillis;

public ScheduledRenewableTimeout( long timeoutTimestampMillis, long timeoutLength, long randomRange, TimeoutHandler
ScheduledRenewableTimeout( long timeoutTimestampMillis, long timeoutLength, long randomRange, TimeoutHandler
handler, DelayedRenewableTimeoutService timeouts )
{
this.timeoutTimestampMillis = timeoutTimestampMillis;
Expand Down Expand Up @@ -229,22 +229,22 @@ public void cancel()
timeouts.cancel( this );
}

public void setTimeoutTimestamp( long newTimestamp )
void setTimeoutTimestamp( long newTimestamp )
{
this.timeoutTimestampMillis = newTimestamp;
}

@Override
public int compareTo( ScheduledRenewableTimeout o )
public int compareTo( ScheduledRenewableTimeout renewableTimeout )
{
if ( timeoutTimestampMillis == o.timeoutTimestampMillis )
if ( timeoutTimestampMillis == renewableTimeout.timeoutTimestampMillis )
{
// Timeouts are set to trigger at the same time.
// Order them by id instead.
return (int) (id - o.id);
return (int) (id - renewableTimeout.id);
}

return (int) (timeoutTimestampMillis - o.timeoutTimestampMillis);
return (int) (timeoutTimestampMillis - renewableTimeout.timeoutTimestampMillis);
}

@Override
Expand Down
Expand Up @@ -19,11 +19,12 @@
*/
package org.neo4j.coreedge.raft.membership;

import java.time.Clock;

import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.state.follower.FollowerState;
import org.neo4j.helpers.Clock;

public class CatchupGoal
class CatchupGoal
{
private static final long MAX_ROUNDS = 10;

Expand All @@ -35,13 +36,13 @@ public class CatchupGoal
private long roundCount;
private long startTime;

public CatchupGoal( ReadableRaftLog raftLog, Clock clock, long electionTimeout )
CatchupGoal( ReadableRaftLog raftLog, Clock clock, long electionTimeout )
{
this.raftLog = raftLog;
this.clock = clock;
this.electionTimeout = electionTimeout;
this.targetIndex = raftLog.appendIndex();
this.startTime = clock.currentTimeMillis();
this.startTime = clock.millis();

this.roundCount = 1;
}
Expand All @@ -50,14 +51,14 @@ boolean achieved( FollowerState followerState )
{
if ( followerState.getMatchIndex() >= targetIndex )
{
if ( (clock.currentTimeMillis() - startTime) <= electionTimeout )
if ( (clock.millis() - startTime) <= electionTimeout )
{
return true;
}
else if ( roundCount < MAX_ROUNDS )
{
roundCount++;
startTime = clock.currentTimeMillis();
startTime = clock.millis();
targetIndex = raftLog.appendIndex();
}
}
Expand Down
Expand Up @@ -19,13 +19,14 @@
*/
package org.neo4j.coreedge.raft.membership;

import java.time.Clock;

import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.state.follower.FollowerState;
import org.neo4j.helpers.Clock;

public class CatchupGoalTracker
class CatchupGoalTracker
{
public static final long MAX_ROUNDS = 10;
static final long MAX_ROUNDS = 10;

private final ReadableRaftLog raftLog;
private final Clock clock;
Expand All @@ -40,15 +41,15 @@ public class CatchupGoalTracker
private boolean finished;
private boolean goalAchieved;

public CatchupGoalTracker( ReadableRaftLog raftLog, Clock clock, long roundTimeout, long catchupTimeout )
CatchupGoalTracker( ReadableRaftLog raftLog, Clock clock, long roundTimeout, long catchupTimeout )
{
this.raftLog = raftLog;
this.clock = clock;
this.roundTimeout = roundTimeout;
this.catchupTimeout = catchupTimeout;
this.targetIndex = raftLog.appendIndex();
this.startTime = clock.currentTimeMillis();
this.roundStartTime = clock.currentTimeMillis();
this.startTime = clock.millis();
this.roundStartTime = clock.millis();

this.roundCount = 1;
}
Expand All @@ -61,12 +62,12 @@ void updateProgress( FollowerState followerState )
}

boolean achievedTarget = followerState.getMatchIndex() >= targetIndex;
if ( achievedTarget && (clock.currentTimeMillis() - roundStartTime) <= roundTimeout )
if ( achievedTarget && (clock.millis() - roundStartTime) <= roundTimeout )
{
goalAchieved = true;
finished = true;
}
else if ( clock.currentTimeMillis() > (startTime + catchupTimeout) )
else if ( clock.millis() > (startTime + catchupTimeout) )
{
finished = true;
}
Expand All @@ -75,7 +76,7 @@ else if ( achievedTarget )
if( roundCount < MAX_ROUNDS )
{
roundCount++;
roundStartTime = clock.currentTimeMillis();
roundStartTime = clock.millis();
targetIndex = raftLog.appendIndex();
}
else
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.coreedge.raft.membership;

import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -39,7 +40,6 @@
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.helpers.Clock;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand Down
Expand Up @@ -19,14 +19,14 @@
*/
package org.neo4j.coreedge.raft.membership;

import java.time.Clock;
import java.util.HashSet;
import java.util.Set;

import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.helpers.Clock;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

Expand Down Expand Up @@ -58,7 +58,7 @@
*
* Only a single member change is handled at a time.
*/
public class RaftMembershipStateMachine<MEMBER>
class RaftMembershipStateMachine<MEMBER>
{
private final Log log;
public RaftMembershipStateMachineEventHandler<MEMBER> state = new Inactive();
Expand All @@ -73,9 +73,9 @@ public class RaftMembershipStateMachine<MEMBER>

private MEMBER catchingUpMember;

public RaftMembershipStateMachine( ReadableRaftLog raftLog, Clock clock, long electionTimeout,
MembershipDriver<MEMBER> membershipDriver, LogProvider logProvider,
long catchupTimeout, RaftMembershipState<MEMBER> membershipState )
RaftMembershipStateMachine( ReadableRaftLog raftLog, Clock clock, long electionTimeout,
MembershipDriver<MEMBER> membershipDriver, LogProvider logProvider,
long catchupTimeout, RaftMembershipState<MEMBER> membershipState )
{
this.raftLog = raftLog;
this.clock = clock;
Expand All @@ -101,37 +101,37 @@ private synchronized void handleState( RaftMembershipStateMachineEventHandler<ME
}
}

public void onRole( Role role )
void onRole( Role role )
{
handleState( state.onRole( role ) );
}

public void onRaftGroupCommitted()
void onRaftGroupCommitted()
{
handleState( state.onRaftGroupCommitted() );
}

public void onFollowerStateChange( FollowerStates<MEMBER> followerStates )
void onFollowerStateChange( FollowerStates<MEMBER> followerStates )
{
handleState( state.onFollowerStateChange( followerStates ) );
}

public void onMissingMember( MEMBER member )
void onMissingMember( MEMBER member )
{
handleState( state.onMissingMember( member ) );
}

public void onSuperfluousMember( MEMBER member )
void onSuperfluousMember( MEMBER member )
{
handleState( state.onSuperfluousMember( member ) );
}

public void onTargetChanged( Set<MEMBER> targetMembers )
void onTargetChanged( Set<MEMBER> targetMembers )
{
handleState( state.onTargetChanged( targetMembers ) );
}

public class Inactive extends RaftMembershipStateMachineEventHandler.Adapter<MEMBER>
private class Inactive extends RaftMembershipStateMachineEventHandler.Adapter<MEMBER>
{
@Override
public RaftMembershipStateMachineEventHandler<MEMBER> onRole( Role role )
Expand All @@ -157,7 +157,7 @@ public String toString()
}
}

public abstract class ActiveBaseState extends RaftMembershipStateMachineEventHandler.Adapter<MEMBER>
abstract class ActiveBaseState extends RaftMembershipStateMachineEventHandler.Adapter<MEMBER>
{
@Override
public RaftMembershipStateMachineEventHandler<MEMBER> onRole( Role role )
Expand All @@ -173,7 +173,7 @@ public RaftMembershipStateMachineEventHandler<MEMBER> onRole( Role role )
}
}

public class Idle extends ActiveBaseState
private class Idle extends ActiveBaseState
{
@Override
public RaftMembershipStateMachineEventHandler<MEMBER> onMissingMember( MEMBER member )
Expand All @@ -198,12 +198,12 @@ public String toString()
}
}

public class CatchingUp extends ActiveBaseState
private class CatchingUp extends ActiveBaseState
{
private final CatchupGoalTracker catchupGoalTracker;
boolean movingToConsensus;

public CatchingUp( MEMBER member )
CatchingUp( MEMBER member )
{
this.catchupGoalTracker = new CatchupGoalTracker( raftLog, clock, electionTimeout, catchupTimeout );
catchingUpMember = member;
Expand Down

0 comments on commit 92deea9

Please sign in to comment.