Skip to content

Commit

Permalink
Renames TimeoutService to RenewableTimeoutService
Browse files Browse the repository at this point in the history
  • Loading branch information
digitalstain committed Dec 1, 2015
1 parent 4e9a5fa commit 5a2af79
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@

import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;

public class ScheduledTimeoutService extends LifecycleAdapter implements Runnable, TimeoutService
public class DelayedRenewableTimeoutService extends LifecycleAdapter implements Runnable, RenewableTimeoutService
{
public static final int TIMER_RESOLUTION = 1;
public static final TimeUnit TIMER_RESOLUTION_UNIT = TimeUnit.MILLISECONDS;

/**
* Sorted by next-to-trigger.
*/
Expand All @@ -50,28 +53,33 @@ public class ScheduledTimeoutService extends LifecycleAdapter implements Runnabl
private final JobScheduler scheduler;
private JobScheduler.JobHandle jobHandle;

public ScheduledTimeoutService()
public DelayedRenewableTimeoutService()
{
this( Clock.SYSTEM_CLOCK );
}

public DelayedRenewableTimeoutService( Clock clock )
{
this.clock = Clock.SYSTEM_CLOCK;
this.clock = clock;
this.random = new Random( nanoTime() );
this.scheduler = new Neo4jJobScheduler();
}

/**
* Set up a new timeout. The attachment is optional data to pass along to the trigger, and can be set to Object
* and null if you don't care about it.
* <p/>
* The randomRange attribute allows you to introduce a bit of arbitrariness in when the timeout is triggered, which
* <p>
* The randomRangeInMillis attribute allows you to introduce a bit of arbitrariness in when the timeout is triggered, which
* is a useful way to avoid "thundering herds" when multiple timeouts are likely to trigger at the same time.
* <p/>
* If you don't want randomness, set randomRange to 0.
* <p>
* If you don't want randomness, set randomRangeInMillis to 0.
*/
@Override
public Timeout create( TimeoutName name, long milliseconds, long randomRange, TimeoutHandler handler )
public Timeout create( TimeoutName name, long delayInMillis, long randomRangeInMillis, TimeoutHandler handler )
{
ScheduledTimeout timeout = new ScheduledTimeout(
calcTimeoutTimestamp( milliseconds, randomRange ),
milliseconds, randomRange, handler, this );
calcTimeoutTimestamp( delayInMillis, randomRangeInMillis ),
delayInMillis, randomRangeInMillis, handler, this );

synchronized ( timeouts )
{
Expand Down Expand Up @@ -153,14 +161,15 @@ public void init() throws Throwable
@Override
public void start() throws Throwable
{
jobHandle = scheduler.scheduleRecurring( new JobScheduler.Group( "Scheduler", POOLED ), this, 1,
TimeUnit.MILLISECONDS );
jobHandle = scheduler.scheduleRecurring( new JobScheduler.Group( "Scheduler", POOLED ), this, TIMER_RESOLUTION,
TIMER_RESOLUTION_UNIT );
}

@Override
public void stop() throws Throwable
{
jobHandle.cancel( false );
scheduler.stop();
scheduler.shutdown();
}

Expand All @@ -171,11 +180,11 @@ public static class ScheduledTimeout implements Timeout, Comparable<ScheduledTim
private final long timeoutLength;
private final long randomRange;
private final TimeoutHandler handler;
private final ScheduledTimeoutService timeouts;
private final DelayedRenewableTimeoutService timeouts;
private long timeoutTimestampMillis;

public ScheduledTimeout( long timeoutTimestampMillis, long timeoutLength, long randomRange, TimeoutHandler
handler, ScheduledTimeoutService timeouts )
handler, DelayedRenewableTimeoutService timeouts )
{
this.timeoutTimestampMillis = timeoutTimestampMillis;
this.timeoutLength = timeoutLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
*/
public class RaftInstance<MEMBER> implements LeaderLocator<MEMBER>, Inbound.MessageHandler
{
public enum Timeouts implements TimeoutService.TimeoutName
public enum Timeouts implements RenewableTimeoutService.TimeoutName
{
ELECTION, HEARTBEAT
}
Expand All @@ -78,33 +78,33 @@ public enum Timeouts implements TimeoutService.TimeoutName
private final MEMBER myself;
private final RaftLog entryLog;

private final TimeoutService timeoutService;
private final RenewableTimeoutService renewableTimeoutService;
private final long heartbeatInterval;
private TimeoutService.Timeout electionTimer;
private RenewableTimeoutService.Timeout electionTimer;
private RaftMembershipManager<MEMBER> membershipManager;

private final long electionTimeout;
private final long leaderWaitTimeout;

private final Outbound<MEMBER> outbound;
private final Log log;
volatile boolean handlingMessage = false;
private volatile boolean handlingMessage = false;
private Role currentRole = Role.FOLLOWER;

private RaftLogShippingManager<MEMBER> logShipping;
private Set<Listener<LeadershipChange<MEMBER>>> leadershipChangeListeners = new HashSet<>();

public RaftInstance( MEMBER myself, TermStore termStore, VoteStore<MEMBER> voteStore, RaftLog entryLog, long electionTimeout,
long heartbeatInterval, TimeoutService timeoutService, final Inbound inbound, final Outbound<MEMBER> outbound,
long leaderWaitTimeout, LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping )
long heartbeatInterval, RenewableTimeoutService renewableTimeoutService, final Inbound inbound, final Outbound<MEMBER> outbound,
long leaderWaitTimeout, LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping )
{
this.myself = myself;
this.entryLog = entryLog;
this.electionTimeout = electionTimeout;
this.heartbeatInterval = heartbeatInterval;

this.timeoutService = timeoutService;
this.renewableTimeoutService = renewableTimeoutService;

this.leaderWaitTimeout = leaderWaitTimeout;
this.outbound = outbound;
Expand All @@ -122,12 +122,12 @@ public RaftInstance( MEMBER myself, TermStore termStore, VoteStore<MEMBER> voteS

private void initTimers()
{
electionTimer = timeoutService.create(
electionTimer = renewableTimeoutService.create(
Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), timeout -> {
handle( new RaftMessages.Timeout.Election<>( myself ) );
timeout.renew();
} );
timeoutService.create(
renewableTimeoutService.create(
Timeouts.HEARTBEAT, heartbeatInterval, 0, timeout -> {
handle( new RaftMessages.Timeout.Heartbeat<>( myself ) );
timeout.renew();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class RaftInstanceBuilder<MEMBER>
private TermStore termStore = new InMemoryTermStore();
private VoteStore<MEMBER> voteStore = new InMemoryVoteStore<>();
private RaftLog raftLog = new InMemoryRaftLog();
private TimeoutService timeoutService = new ScheduledTimeoutService();
private RenewableTimeoutService renewableTimeoutService = new DelayedRenewableTimeoutService();

private Inbound inbound = handler -> {};
private Outbound<MEMBER> outbound = ( advertisedSocketAddress, messages ) -> {};
Expand Down Expand Up @@ -73,12 +73,12 @@ public RaftInstance<MEMBER> build()
RaftLogShippingManager<MEMBER> logShipping = new RaftLogShippingManager<>( outbound, logProvider, raftLog, clock, member, membershipManager, retryTimeMillis );

return new RaftInstance<>( member, termStore, voteStore, raftLog, electionTimeout, heartbeatInterval,
timeoutService, inbound, outbound, leaderWaitTimeout, logProvider, membershipManager, logShipping );
renewableTimeoutService, inbound, outbound, leaderWaitTimeout, logProvider, membershipManager, logShipping );
}

public RaftInstanceBuilder<MEMBER> timeoutService( TimeoutService timeoutService )
public RaftInstanceBuilder<MEMBER> timeoutService( RenewableTimeoutService renewableTimeoutService )
{
this.timeoutService = timeoutService;
this.renewableTimeoutService = renewableTimeoutService;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
*/
package org.neo4j.coreedge.raft;

public interface TimeoutService
/**
* A service for creating {@link RenewableTimeoutService.Timeout} instances.
*/
public interface RenewableTimeoutService
{
Timeout create( TimeoutName timeoutName, long milliseconds, long randomRange, TimeoutHandler handler );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@
import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.ScheduledTimeoutService;
import org.neo4j.coreedge.raft.TimeoutService.TimeoutName;
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.raft.RenewableTimeoutService.TimeoutName;
import org.neo4j.helpers.Clock;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.Long.max;
import static java.lang.Long.min;
import static java.lang.String.format;
import static org.neo4j.coreedge.raft.TimeoutService.*;
import static org.neo4j.coreedge.raft.RenewableTimeoutService.*;

/// Optimizations
// TODO: Have several outstanding batches in catchup mode, to bridge the latency gap.
Expand Down Expand Up @@ -97,7 +97,7 @@ enum Mode
private final MEMBER follower;
private final MEMBER leader;

private ScheduledTimeoutService timeoutService;
private DelayedRenewableTimeoutService timeoutService;
private final TimeoutName timeoutName = () -> "RESEND";
private final long retryTimeMillis;
private Timeout timeout;
Expand Down Expand Up @@ -135,7 +135,7 @@ public synchronized void start()

try
{
timeoutService = new ScheduledTimeoutService();
timeoutService = new DelayedRenewableTimeoutService();
timeoutService.init();
timeoutService.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.raft.ScheduledTimeoutService;
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
Expand All @@ -50,7 +50,7 @@ public CoreServerStartupProcess( LocalDatabase localDatabase, DataSourceManager
ReplicatedIdGeneratorFactory idGeneratorFactory,
RaftInstance<CoreMember> raft, RaftLog raftLog, RaftServer<CoreMember> raftServer,
CatchupServer catchupServer,
ScheduledTimeoutService raftTimeoutService,
DelayedRenewableTimeoutService raftTimeoutService,
MembershipWaiter<CoreMember> membershipWaiter,
long joinCatchupTimeout )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
import org.neo4j.coreedge.server.ListenSocketAddress;
import org.neo4j.coreedge.server.logging.BetterMessageLogger;
import org.neo4j.coreedge.server.logging.MessageLogger;
import org.neo4j.coreedge.raft.ScheduledTimeoutService;
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
Expand Down Expand Up @@ -167,7 +167,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
ListenSocketAddress raftListenAddress = config.get( CoreEdgeClusterSettings.raft_listen_address );
RaftServer<CoreMember> raftServer = new RaftServer<>( marshall, raftListenAddress, logProvider );

final ScheduledTimeoutService raftTimeoutService = new ScheduledTimeoutService();
final DelayedRenewableTimeoutService raftTimeoutService = new DelayedRenewableTimeoutService();

File raftLogsDirectory = createRaftLogsDirectory( platformModule.storeDir, fileSystem );
NaiveDurableRaftLog raftLog = new NaiveDurableRaftLog( fileSystem, raftLogsDirectory, new RaftContentSerializer(),
Expand Down Expand Up @@ -319,7 +319,7 @@ private static RaftInstance<CoreMember> createRaft( LifeSupport life,
CoreMember myself,
LogProvider logProvider,
RaftServer<CoreMember> raftServer,
ScheduledTimeoutService raftTimeoutService )
DelayedRenewableTimeoutService raftTimeoutService )
{
LoggingInbound loggingRaftInbound = new LoggingInbound( raftServer, messageLogger, myself.getRaftAddress() );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.CoreServerStartupProcess;
import org.neo4j.coreedge.raft.ScheduledTimeoutService;
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;

import static org.mockito.Matchers.any;
Expand All @@ -53,7 +53,7 @@ public void startShouldDeleteStoreAndStartNewDatabase() throws Throwable
RaftLog raftLog = mock( RaftLog.class );
RaftServer<CoreMember> raftServer = mock( RaftServer.class );
CatchupServer catchupServer = mock( CatchupServer.class );
ScheduledTimeoutService timeoutService = mock( ScheduledTimeoutService.class );
DelayedRenewableTimeoutService timeoutService = mock( DelayedRenewableTimeoutService.class );
MembershipWaiter<CoreMember> membershipWaiter = mock( MembershipWaiter.class );
when(membershipWaiter.waitUntilCaughtUpMember( any(ReadableRaftState.class) ))
.thenReturn( mock( CompletableFuture.class ) );
Expand Down Expand Up @@ -81,7 +81,7 @@ public void stopShouldStopDatabase() throws Throwable
RaftLog raftLog = mock( RaftLog.class );
RaftServer<CoreMember> raftServer = mock( RaftServer.class );
CatchupServer catchupServer = mock( CatchupServer.class );
ScheduledTimeoutService timeoutService = mock( ScheduledTimeoutService.class );
DelayedRenewableTimeoutService timeoutService = mock( DelayedRenewableTimeoutService.class );
MembershipWaiter<CoreMember> membershipListener = mock( MembershipWaiter.class );
when(membershipListener.waitUntilCaughtUpMember( any(ReadableRaftState.class) )).thenReturn( mock(CompletableFuture.class) );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import static org.mockito.Mockito.mock;

public class ControlledTimeoutService implements TimeoutService
public class ControlledRenewableTimeoutService implements RenewableTimeoutService
{
private Map<TimeoutName, Pair<TimeoutHandler, Timeout>> handlers = new HashMap<>();

Expand Down

0 comments on commit 5a2af79

Please sign in to comment.