Skip to content

Commit

Permalink
Improved timer service
Browse files Browse the repository at this point in the history
This replaces the very custom polling timer service with a
simple abstraction working on top of the job scheduler.
  • Loading branch information
martinfurmanski committed Dec 12, 2017
1 parent 1ac08d0 commit 12a72ea
Show file tree
Hide file tree
Showing 36 changed files with 1,378 additions and 955 deletions.
4 changes: 2 additions & 2 deletions community/common/src/main/java/org/neo4j/time/FakeClock.java
Expand Up @@ -32,11 +32,11 @@ public class FakeClock extends SystemNanoClock
{
private long nanoTime = 0;

FakeClock()
protected FakeClock()
{
}

FakeClock( long initialTime, TimeUnit unit )
protected FakeClock( long initialTime, TimeUnit unit )
{
forward( initialTime, unit );
}
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -167,6 +168,11 @@ class Groups
*/
public static Group storageMaintenance = new Group( "StorageMaintenance", POOLED );

/**
* Raft timers.
*/
public static Group raft = new Group( "RaftTimer", POOLED );

/**
* Native security.
*/
Expand All @@ -182,7 +188,7 @@ interface JobHandle
{
void cancel( boolean mayInterruptIfRunning );

void waitTermination() throws InterruptedException, ExecutionException;
void waitTermination() throws InterruptedException, ExecutionException, CancellationException;
}

/** Expose a group scheduler as an {@link Executor} */
Expand Down
@@ -0,0 +1,240 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.test;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.time.FakeClock;

/**
* N.B - Do not use this with time resolutions of less than 1 ms!
*/
public class FakeClockJobScheduler extends FakeClock implements JobScheduler
{
private final AtomicLong jobIdGen = new AtomicLong();
private final Collection<JobHandle> jobs = new CopyOnWriteArrayList<>();

public FakeClockJobScheduler()
{
super();
}

private JobHandle schedule( Runnable job, long firstDeadline )
{
JobHandle jobHandle = new JobHandle( job, firstDeadline, 0 );
jobs.add( jobHandle );
return jobHandle;
}

private JobHandle scheduleRecurring( Runnable job, long firstDeadline, long period )
{
JobHandle jobHandle = new JobHandle( job, firstDeadline, period );
jobs.add( jobHandle );
return jobHandle;
}

@Override
public FakeClock forward( long delta, TimeUnit unit )
{
super.forward( delta, unit );
processSchedule();
return this;
}

private void processSchedule()
{
boolean anyTriggered;
do
{
anyTriggered = false;
for ( JobHandle job : jobs )
{
if ( job.tryTrigger() )
{
anyTriggered = true;
}
}
}
while ( anyTriggered );
}

private long now()
{
return instant().toEpochMilli();
}

@Override
public Executor executor( Group group )
{
return job -> schedule( job, now() );
}

@Override
public ThreadFactory threadFactory( Group group )
{
throw new UnsupportedOperationException();
}

@Override
public JobHandle schedule( Group group, Runnable job )
{
JobHandle handle = schedule( job, now() );
processSchedule();
return handle;
}

@Override
public JobHandle schedule( Group group, Runnable job, Map<String,String> metadata )
{
JobHandle handle = schedule( job, now() );
processSchedule();
return handle;
}

@Override
public JobHandle schedule( Group group, Runnable job, long initialDelay, TimeUnit timeUnit )
{
JobHandle handle = schedule( job, now() + timeUnit.toMillis( initialDelay ) );
if ( initialDelay <= 0 )
{
processSchedule();
}
return handle;
}

@Override
public JobHandle scheduleRecurring( Group group, Runnable job, long period, TimeUnit timeUnit )
{
JobHandle handle = scheduleRecurring( job, now(), timeUnit.toMillis( period ) );
processSchedule();
return handle;
}

@Override
public JobHandle scheduleRecurring( Group group, Runnable job, long initialDelay, long period, TimeUnit timeUnit )
{
JobHandle handle = scheduleRecurring( job, now() + timeUnit.toMillis( initialDelay ), timeUnit.toMillis( period ) );
if ( initialDelay <= 0 )
{
processSchedule();
}
return handle;
}

@Override
public void init() throws Throwable
{
throw new UnsupportedOperationException();
}

@Override
public void start() throws Throwable
{
throw new UnsupportedOperationException();
}

@Override
public void stop() throws Throwable
{
throw new UnsupportedOperationException();
}

@Override
public void shutdown() throws Throwable
{
throw new UnsupportedOperationException();
}

class JobHandle implements JobScheduler.JobHandle
{
private final long id = jobIdGen.incrementAndGet();
private final Runnable runnable;
private final long period;

private long deadline;

JobHandle( Runnable runnable, long firstDeadline, long period )
{
this.runnable = runnable;
this.deadline = firstDeadline;
this.period = period;
}

boolean tryTrigger()
{
if ( now() >= deadline )
{
runnable.run();
if ( period != 0 )
{
deadline += period;
}
else
{
jobs.remove( this );
}
return true;
}
return false;
}

@Override
public void cancel( boolean mayInterruptIfRunning )
{
jobs.remove( this );
}

@Override
public void waitTermination() throws InterruptedException, ExecutionException
{
throw new UnsupportedOperationException();
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
JobHandle jobHandle = (JobHandle) o;
return id == jobHandle.id;
}

@Override
public int hashCode()
{
return Objects.hash( id );
}
}
}
Expand Up @@ -30,9 +30,9 @@
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutName;
import org.neo4j.causalclustering.core.consensus.schedule.Timer;
import org.neo4j.causalclustering.core.consensus.schedule.TimerService;
import org.neo4j.causalclustering.core.consensus.schedule.TimerService.TimerName;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionException;
Expand All @@ -47,10 +47,13 @@
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.PANIC;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.STORE_COPYING;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.TX_PULLING;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.Timeouts.TX_PULLER_TIMEOUT;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.Timers.TX_PULLER_TIMER;
import static org.neo4j.causalclustering.core.consensus.schedule.TimeoutFactory.fixedTimeout;
import static org.neo4j.kernel.impl.util.JobScheduler.Groups.pullUpdates;

/**
* This class is responsible for pulling transactions from a core server and queuing
Expand All @@ -62,9 +65,9 @@
*/
public class CatchupPollingProcess extends LifecycleAdapter
{
enum Timeouts implements TimeoutName
enum Timers implements TimerName
{
TX_PULLER_TIMEOUT
TX_PULLER_TIMER
}

enum State
Expand All @@ -81,20 +84,20 @@ enum State
private final Supplier<DatabaseHealth> databaseHealthSupplier;
private final CatchUpClient catchUpClient;
private final CoreMemberSelectionStrategy connectionStrategy;
private final RenewableTimeoutService timeoutService;
private final TimerService timerService;
private final long txPullIntervalMillis;
private final BatchingTxApplier applier;
private final PullRequestMonitor pullRequestMonitor;

private RenewableTimeout timeout;
private Timer timer;
private volatile State state = TX_PULLING;
private DatabaseHealth dbHealth;
private CompletableFuture<Boolean> upToDateFuture; // we are up-to-date when we are successfully pulling
private volatile long latestTxIdOfUpStream;

public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDatabase,
Lifecycle startStopOnStoreCopy, CatchUpClient catchUpClient,
CoreMemberSelectionStrategy connectionStrategy, RenewableTimeoutService timeoutService,
CoreMemberSelectionStrategy connectionStrategy, TimerService timerService,
long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors,
StoreCopyProcess storeCopyProcess, Supplier<DatabaseHealth> databaseHealthSupplier )
{
Expand All @@ -103,7 +106,7 @@ public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDataba
this.startStopOnStoreCopy = startStopOnStoreCopy;
this.catchUpClient = catchUpClient;
this.connectionStrategy = connectionStrategy;
this.timeoutService = timeoutService;
this.timerService = timerService;
this.txPullIntervalMillis = txPullIntervalMillis;
this.applier = applier;
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
Expand All @@ -114,7 +117,8 @@ public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDataba
@Override
public synchronized void start() throws Throwable
{
timeout = timeoutService.create( TX_PULLER_TIMEOUT, txPullIntervalMillis, 0, timeout -> onTimeout() );
timer = timerService.create( TX_PULLER_TIMER, pullUpdates, timeout -> onTimeout() );
timer.set( fixedTimeout( txPullIntervalMillis, MILLISECONDS ) );
dbHealth = databaseHealthSupplier.get();
upToDateFuture = new CompletableFuture<>();
}
Expand All @@ -127,7 +131,7 @@ public Future<Boolean> upToDateFuture() throws InterruptedException
@Override
public void stop() throws Throwable
{
timeout.cancel();
timer.cancel( true, true );
}

public State state()
Expand Down Expand Up @@ -163,7 +167,7 @@ private void onTimeout()

if ( state != PANIC )
{
timeout.renew();
timer.reset();
}
}

Expand Down
Expand Up @@ -197,7 +197,6 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke

dependencies.satisfyDependency( lockManager );

life.add( consensusModule.raftTimeoutService() );
life.add( coreServerModule.membershipWaiterLifecycle );
}

Expand Down

0 comments on commit 12a72ea

Please sign in to comment.