Skip to content

Commit

Permalink
More cleanups and javadocs for the job scheduler APIs.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Aug 6, 2018
1 parent 77e9833 commit eaeedc4
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 18 deletions.
Expand Up @@ -19,47 +19,121 @@
*/
package org.neo4j.scheduler;

import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

import static java.lang.Runtime.getRuntime;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadExecutor;

/**
* Implementations of this interface are used by the {@link JobHandle} implememtation to create the underlying {@link ExecutorService}s that actually run the
* scheduled jobs. The choice of implementation is decided by the scheduling {@link Group}, which can thereby influence how jobs in the particular group are
* executed.
*/
interface ExecutorServiceFactory
{
ExecutorService build( Group group, SchedulerThreadFactory factory, OptionalInt threadCount );
/**
* Create an {@link ExecutorService} with a default thread count.
*/
ExecutorService build( Group group, SchedulerThreadFactory factory );

/**
* Create an {@link ExecutorService}, ideally with the desired thread count if possible.
* Implementations are allowed to ignore the given thread count.
*/
ExecutorService build( Group group, SchedulerThreadFactory factory, int threadCount );

/**
* This factory actually prevents the scheduling and execution of any jobs, which is useful for groups that are not meant to be scheduled directly.
*/
static ExecutorServiceFactory unschedulable()
{
return ( group, factory, threadCount ) ->
return new ExecutorServiceFactory()
{
throw new IllegalArgumentException( "Tasks cannot be scheduled directly to the " + group.groupName() + " group." );
@Override
public ExecutorService build( Group group, SchedulerThreadFactory factory )
{
throw newUnschedulableException( group );
}

@Override
public ExecutorService build( Group group, SchedulerThreadFactory factory, int threadCount )
{
throw newUnschedulableException( group );
}

private IllegalArgumentException newUnschedulableException( Group group )
{
return new IllegalArgumentException( "Tasks cannot be scheduled directly to the " + group.groupName() + " group." );
}
};
}

/**
* Executes all jobs in the same single thread.
*/
static ExecutorServiceFactory singleThread()
{
return ( group, factory, threadCount ) -> newSingleThreadExecutor( factory );
return new ExecutorServiceFactory()
{
@Override
public ExecutorService build( Group group, SchedulerThreadFactory factory )
{
return newSingleThreadExecutor( factory );
}

@Override
public ExecutorService build( Group group, SchedulerThreadFactory factory, int threadCount )
{
return build( group, factory ); // Just ignore the thread count.
}
};
}

/**
* Execute jobs in a dynamically growing pool of threads. The threads will be cached and kept around for a little while to cope with work load spikes
* and troughs.
*/
static ExecutorServiceFactory cached()
{
return ( group, factory, threadCount ) ->
return new ExecutorServiceFactory()
{
if ( threadCount.isPresent() )
@Override
public ExecutorService build( Group group, SchedulerThreadFactory factory )
{
int threadCountAsInt = threadCount.getAsInt();
return newFixedThreadPool( threadCountAsInt, factory );
return newCachedThreadPool( factory );
}

@Override
public ExecutorService build( Group group, SchedulerThreadFactory factory, int threadCount )
{
return newFixedThreadPool( threadCount, factory );
}
return newCachedThreadPool( factory );
};
}

/**
* Schedules jobs in a work-stealing (ForkJoin) thread pool. {@link java.util.stream.Stream#parallel Parallel streams} and {@link ForkJoinTask}s started
* from within the scheduled jobs will also run inside the same {@link ForkJoinPool}.
*/
static ExecutorServiceFactory workStealing()
{
return ( group, factory, threadCount ) -> new ForkJoinPool( threadCount.orElse( getRuntime().availableProcessors() ), factory, null, false );
return new ExecutorServiceFactory()
{
@Override
public ExecutorService build( Group group, SchedulerThreadFactory factory )
{
return new ForkJoinPool( getRuntime().availableProcessors(), factory, null, false );
}

@Override
public ExecutorService build( Group group, SchedulerThreadFactory factory, int threadCount )
{
return new ForkJoinPool( threadCount, factory, null, false );
}
};
}
}
9 changes: 7 additions & 2 deletions community/common/src/main/java/org/neo4j/scheduler/Group.java
Expand Up @@ -36,7 +36,7 @@ public enum Group
/** Monitor and report system-wide pauses, in case they lead to service interruption. */
VM_PAUSE_MONITOR( "VmPauseMonitor" ),
/** Rotates diagnostic text logs. */
TEXT_LOG_ROTATION( "TextLogRotation" ),
LOG_ROTATION( "LogRotation" ),
/** Checkpoint and store flush. */
CHECKPOINT( "CheckPoint" ),
/** Various little periodic tasks that need to be done on a regular basis to keep the store in good shape. */
Expand Down Expand Up @@ -122,7 +122,12 @@ public String threadName()
return "neo4j." + groupName() + "-" + threadCounter.incrementAndGet();
}

public ExecutorService buildExecutorService( SchedulerThreadFactory factory, OptionalInt threadCount )
public ExecutorService buildExecutorService( SchedulerThreadFactory factory )
{
return executorServiceFactory.build( this, factory );
}

public ExecutorService buildExecutorService( SchedulerThreadFactory factory, int threadCount )
{
return executorServiceFactory.build( this, factory, threadCount );
}
Expand Down
Expand Up @@ -20,8 +20,13 @@
package org.neo4j.scheduler;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ThreadFactory;

/**
* Thread factories used by the {@link JobScheduler} need to be able to create both normal threads, and {@link ForkJoinWorkerThread}s.
* This interface collects the capabilities of both types of thread factories.
*/
public interface SchedulerThreadFactory extends ThreadFactory, ForkJoinPool.ForkJoinWorkerThreadFactory
{
}
Expand Up @@ -73,7 +73,7 @@ public Builder withRotation( long internalLogRotationThreshold, long internalLog
int maxInternalLogArchives, JobScheduler jobScheduler )
{
return withRotation( internalLogRotationThreshold, internalLogRotationDelay, maxInternalLogArchives,
jobScheduler.executor( Group.TEXT_LOG_ROTATION ) );
jobScheduler.executor( Group.LOG_ROTATION ) );
}

public Builder withRotation( long internalLogRotationThreshold, long internalLogRotationDelay,
Expand Down
Expand Up @@ -40,7 +40,7 @@ final class ThreadPool
ThreadPool( Group group, ThreadGroup parentThreadGroup )
{
threadFactory = new GroupedDaemonThreadFactory( group, parentThreadGroup );
executor = group.buildExecutorService( threadFactory, OptionalInt.empty() );
executor = group.buildExecutorService( threadFactory );
registry = new ConcurrentHashMap<>();
}

Expand Down
Expand Up @@ -127,7 +127,7 @@ private BiFunction<File,RotatingFileOutputStreamSupplier.RotationListener,Rotati
try
{
return new RotatingFileOutputStreamSupplier( fileSystem, file, rotationThreshold, 0, maxArchives,
scheduler.executor( Group.TEXT_LOG_ROTATION ), listener );
scheduler.executor( Group.LOG_ROTATION ), listener );
}
catch ( IOException e )
{
Expand Down
Expand Up @@ -193,7 +193,7 @@ private void buildRotatingLog( long rotationThreshold, int maxArchives ) throws
RotatingFileOutputStreamSupplier rotatingSupplier = new RotatingFileOutputStreamSupplier(
fileSystem, currentQueryLogFile,
rotationThreshold, 0, maxArchives,
scheduler.executor( Group.TEXT_LOG_ROTATION ) );
scheduler.executor( Group.LOG_ROTATION ) );
log = logBuilder.toOutputStream( rotatingSupplier );
closable = rotatingSupplier;
}
Expand Down
Expand Up @@ -216,7 +216,7 @@ public static SecurityLog create( Config config, Log log, FileSystemAbstraction
try
{
return new SecurityLog( config, fileSystem,
jobScheduler.executor( Group.TEXT_LOG_ROTATION ) );
jobScheduler.executor( Group.LOG_ROTATION ) );
}
catch ( IOException ioe )
{
Expand Down

0 comments on commit eaeedc4

Please sign in to comment.