Skip to content

Commit

Permalink
Remove NEW_THREAD scheduling strategy from Noe4jJobScheduler
Browse files Browse the repository at this point in the history
Since the best way to create a new thread by using the scheduler is to
use the thread factory, this changes will make more straightforward
how to create new thread using the scheduler.  And also note that the
NEW_THREAD strategy was unsupported in several schedule methods.
  • Loading branch information
davidegrohmann committed May 22, 2017
1 parent c3a5cbf commit 5b2b0b9
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 207 deletions.
Expand Up @@ -29,22 +29,11 @@

import org.neo4j.kernel.lifecycle.Lifecycle;

import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.NEW_THREAD;
import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.POOLED;

/**
* To be expanded, the idea here is to have a database-global service for running jobs, handling jobs crashing and so on.
*/
public interface JobScheduler extends Lifecycle
{
enum SchedulingStrategy
{
/** Create a new thread each time a job is scheduled */
NEW_THREAD,
/** Run the job from a pool of threads, shared among all groups with this strategy */
POOLED
}

/**
* Represents a common group of jobs, defining how they should be scheduled.
*/
Expand All @@ -53,26 +42,19 @@ class Group
public static final String THREAD_ID = "thread-id";
public static final Map<String, String> NO_METADATA = Collections.emptyMap();

private final AtomicInteger threadCounter = new AtomicInteger();
private final String name;
private final SchedulingStrategy strategy;
private final AtomicInteger threadCounter = new AtomicInteger( 0 );

public Group( String name, SchedulingStrategy strategy )
public Group( String name )
{
this.name = name;
this.strategy = strategy;
}

public String name()
{
return name;
}

public SchedulingStrategy strategy()
{
return strategy;
}

/**
* Name a new thread. This method may or may not be used, it is up to the scheduling strategy to decide
* to honor this.
Expand All @@ -98,89 +80,89 @@ public String threadName( Map<String, String> metadata )
class Groups
{
/** Session workers, these perform the work of actually executing client queries. */
public static final Group sessionWorker = new Group( "Session", NEW_THREAD );
public static final Group sessionWorker = new Group( "Session" );

/** Background index population */
public static final Group indexPopulation = new Group( "IndexPopulation", POOLED );
public static final Group indexPopulation = new Group( "IndexPopulation" );

/** Push transactions from master to slaves */
public static final Group masterTransactionPushing = new Group( "TransactionPushing", POOLED );
public static final Group masterTransactionPushing = new Group( "TransactionPushing" );

/**
* Rolls back idle transactions on the server.
*/
public static final Group serverTransactionTimeout = new Group( "ServerTransactionTimeout", POOLED );
public static final Group serverTransactionTimeout = new Group( "ServerTransactionTimeout" );

/**
* Aborts idle slave lock sessions on the master.
*/
public static final Group slaveLocksTimeout = new Group( "SlaveLocksTimeout", POOLED );
public static final Group slaveLocksTimeout = new Group( "SlaveLocksTimeout" );

/**
* Pulls updates from the master.
*/
public static final Group pullUpdates = new Group( "PullUpdates", POOLED );
public static final Group pullUpdates = new Group( "PullUpdates" );

/**
* Gathers approximated data about the underlying data store.
*/
public static final Group indexSamplingController = new Group( "IndexSamplingController", POOLED );
public static final Group indexSampling = new Group( "IndexSampling", POOLED );
public static final Group indexSamplingController = new Group( "IndexSamplingController" );
public static final Group indexSampling = new Group( "IndexSampling" );

/**
* Rotates internal diagnostic logs
*/
public static final Group internalLogRotation = new Group( "InternalLogRotation", POOLED );
public static final Group internalLogRotation = new Group( "InternalLogRotation" );

/**
* Rotates query logs
*/
public static final Group queryLogRotation = new Group( "queryLogRotation", POOLED );
public static final Group queryLogRotation = new Group( "queryLogRotation" );

/**
* Checkpoint and store flush
*/
public static final Group checkPoint = new Group( "CheckPoint", POOLED );
public static final Group checkPoint = new Group( "CheckPoint" );

/**
* Raft Log pruning
*/
public static final Group raftLogPruning = new Group( "RaftLogPruning", POOLED );
public static final Group raftLogPruning = new Group( "RaftLogPruning" );

/**
* Network IO threads for the Bolt protocol.
*/
public static final Group boltNetworkIO = new Group( "BoltNetworkIO", NEW_THREAD );
public static final Group boltNetworkIO = new Group( "BoltNetworkIO" );

/**
* Reporting thread for Metrics events
*/
public static final Group metricsEvent = new Group( "MetricsEvent", POOLED );
public static final Group metricsEvent = new Group( "MetricsEvent" );

/**
* UDC timed events.
*/
public static Group udc = new Group( "UsageDataCollection", POOLED );
public static Group udc = new Group( "UsageDataCollection" );

/**
* Storage maintenance.
*/
public static Group storageMaintenance = new Group( "StorageMaintenance", POOLED );
public static Group storageMaintenance = new Group( "StorageMaintenance" );

/**
* Native security.
*/
public static Group nativeSecurity = new Group( "NativeSecurity", POOLED );
public static Group nativeSecurity = new Group( "NativeSecurity" );

/**
* File watch service group
*/
public static Group fileWatch = new Group( "FileWatcher", NEW_THREAD );
public static Group fileWatch = new Group( "FileWatcher" );

/**
* Recovery cleanup.
*/
public static Group recoveryCleanup = new Group( "RecoveryCleanup", POOLED );
public static Group recoveryCleanup = new Group( "RecoveryCleanup" );

private Groups()
{
Expand Down
Expand Up @@ -86,17 +86,7 @@ public JobHandle schedule( Group group, Runnable job, Map<String,String> metadat
throw new RejectedExecutionException( "Scheduler is not started" );
}

switch ( group.strategy() )
{
case POOLED:
return register( new PooledJobHandle( this.globalPool.submit( job ) ) );
case NEW_THREAD:
Thread thread = createNewThread( group, job, metadata );
thread.start();
return new SingleThreadHandle( thread );
default:
throw new IllegalArgumentException( "Unsupported strategy for scheduling job: " + group.strategy() );
}
return register( new PooledJobHandle( this.globalPool.submit( job ) ) );
}

private JobHandle register( PooledJobHandle pooledJobHandle )
Expand Down Expand Up @@ -136,27 +126,15 @@ public JobHandle scheduleRecurring( Group group, final Runnable runnable, long p

@Override
public JobHandle scheduleRecurring( Group group, final Runnable runnable, long initialDelay, long period,
TimeUnit timeUnit )
TimeUnit timeUnit )
{
switch ( group.strategy() )
{
case POOLED:
return new PooledJobHandle( scheduledExecutor.scheduleAtFixedRate( runnable, initialDelay, period, timeUnit ) );
default:
throw new IllegalArgumentException( "Unsupported strategy to use for recurring jobs: " + group.strategy() );
}
return new PooledJobHandle( scheduledExecutor.scheduleAtFixedRate( runnable, initialDelay, period, timeUnit ) );
}

@Override
public JobHandle schedule( Group group, final Runnable runnable, long initialDelay, TimeUnit timeUnit )
{
switch ( group.strategy() )
{
case POOLED:
return new PooledJobHandle( scheduledExecutor.schedule( runnable, initialDelay, timeUnit ) );
default:
throw new IllegalArgumentException( "Unsupported strategy to use for delayed jobs: " + group.strategy() );
}
return new PooledJobHandle( scheduledExecutor.schedule( runnable, initialDelay, timeUnit ) );
}

@Override
Expand Down Expand Up @@ -267,29 +245,4 @@ public void registerCancelListener( CancelListener listener )
cancelListeners.add( listener );
}
}

private static class SingleThreadHandle implements JobHandle
{
private final Thread thread;

SingleThreadHandle( Thread thread )
{
this.thread = thread;
}

@Override
public void cancel( boolean mayInterruptIfRunning )
{
if ( mayInterruptIfRunning )
{
thread.interrupt();
}
}

@Override
public void waitTermination() throws InterruptedException
{
thread.join();
}
}
}
Expand Up @@ -20,11 +20,8 @@
package org.neo4j.kernel.impl.util;

import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -34,9 +31,10 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.scheduler.JobScheduler.JobHandle;
import org.neo4j.kernel.lifecycle.LifeSupport;

import static java.lang.Thread.sleep;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -46,11 +44,7 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.neo4j.helpers.Exceptions.launderedException;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.scheduler.JobScheduler.Group.THREAD_ID;
import static org.neo4j.scheduler.JobScheduler.Groups.indexPopulation;
import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.NEW_THREAD;
import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.POOLED;
import static org.neo4j.test.ReflectionUtil.replaceValueInPrivateField;

public class Neo4jJobSchedulerTest
Expand Down Expand Up @@ -119,41 +113,6 @@ public void shouldCancelRecurringJob() throws Exception
assertThat( invocations.get(), equalTo( recorded ) );
}

@Test
public void shouldRunJobInNewThread() throws Throwable
{
// Given
life.start();

// We start a thread that will signal when it's running, and remain running until we tell it to stop.
// This way we can check and make sure a thread with the name we expect is live and well
final CountDownLatch threadStarted = new CountDownLatch( 1 );
final CountDownLatch unblockThread = new CountDownLatch( 1 );

// When
scheduler.schedule( new JobScheduler.Group( "MyGroup", NEW_THREAD ),
waitForLatch( threadStarted, unblockThread ), stringMap( THREAD_ID, "MyTestThread" ) );
threadStarted.await();

// Then
try
{
String threadName = "neo4j.MyGroup-MyTestThread";
for ( String name : threadNames() )
{
if ( name.equals( threadName ) )
{
return;
}
}
Assert.fail( "Expected a thread named '" + threadName + "' in " + threadNames() );
}
finally
{
unblockThread.countDown();
}
}

@Test
public void shouldRunWithDelay() throws Throwable
{
Expand All @@ -165,14 +124,10 @@ public void shouldRunWithDelay() throws Throwable

long time = System.nanoTime();

scheduler.schedule( new JobScheduler.Group( "group", POOLED ), new Runnable()
scheduler.schedule( new JobScheduler.Group( "group" ), () ->
{
@Override
public void run()
{
runTime.set( System.nanoTime() );
latch.countDown();
}
runTime.set( System.nanoTime() );
latch.countDown();
}, 100, TimeUnit.MILLISECONDS );

latch.await();
Expand Down Expand Up @@ -236,32 +191,6 @@ public void shouldNotifyCancelListeners() throws Exception
neo4jJobScheduler.shutdown();
}

private List<String> threadNames()
{
List<String> names = new ArrayList<>();
for ( Thread thread : Thread.getAllStackTraces().keySet() )
{
names.add( thread.getName() );
}
return names;
}

private Runnable waitForLatch( final CountDownLatch threadStarted, final CountDownLatch runUntil )
{
return () ->
{
try
{
threadStarted.countDown();
runUntil.await();
}
catch ( InterruptedException e )
{
throw new RuntimeException( e );
}
};
}

private void awaitFirstInvocation()
{
while ( invocations.get() == 0 )
Expand Down

0 comments on commit 5b2b0b9

Please sign in to comment.