Skip to content

Commit

Permalink
Move the Neo4jJobScheduler to a dedicated scheduler package, and rena…
Browse files Browse the repository at this point in the history
…me it to CentralJobScheduler.
  • Loading branch information
chrisvest committed Mar 15, 2018
1 parent 9ca7292 commit 22ebc0f
Show file tree
Hide file tree
Showing 27 changed files with 77 additions and 77 deletions.
Expand Up @@ -43,7 +43,6 @@
import org.neo4j.bolt.runtime.CachedThreadPoolExecutorFactory;
import org.neo4j.bolt.runtime.DefaultBoltConnectionFactory;
import org.neo4j.bolt.runtime.ExecutorBoltSchedulerProvider;
import org.neo4j.bolt.security.auth.AuthenticationException;
import org.neo4j.bolt.security.auth.AuthenticationResult;
import org.neo4j.bolt.testing.BoltResponseRecorder;
import org.neo4j.bolt.testing.RecordedBoltResponse;
Expand All @@ -58,7 +57,7 @@
import org.neo4j.kernel.configuration.BoltConnector;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.kernel.impl.util.ValueUtils;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.monitoring.Monitors;
Expand Down Expand Up @@ -93,7 +92,7 @@ public class ResetFuzzTest
/** We track the number of un-closed transactions, and fail if we ever leak one */
private final AtomicLong liveTransactions = new AtomicLong();
private final Monitors monitors = new Monitors();
private final Neo4jJobScheduler scheduler = life.add(new Neo4jJobScheduler());
private final CentralJobScheduler scheduler = life.add( new CentralJobScheduler() );
private final BoltSchedulerProvider boltSchedulerProvider = life.add(
new ExecutorBoltSchedulerProvider( createConfig(), new CachedThreadPoolExecutorFactory( NullLog.getInstance() ), scheduler,
NullLogService.getInstance() ) );
Expand Down
Expand Up @@ -55,7 +55,7 @@
import org.neo4j.kernel.impl.logging.StoreLogService;
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.kernel.impl.util.Converters;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.kernel.impl.util.Validator;
import org.neo4j.kernel.impl.util.Validators;
import org.neo4j.kernel.internal.Version;
Expand Down Expand Up @@ -561,7 +561,7 @@ public static void doImport( PrintStream out, PrintStream err, InputStream in, F
dbConfig.augment( logs_directory, logsDir.getCanonicalPath() );
File internalLogFile = dbConfig.get( store_internal_log_path );
LogService logService = life.add( StoreLogService.withInternalLog( internalLogFile ).build( fs ) );
final Neo4jJobScheduler jobScheduler = life.add( new Neo4jJobScheduler() );
final CentralJobScheduler jobScheduler = life.add( new CentralJobScheduler() );

life.start();
ExecutionMonitor executionMonitor = detailedProgress
Expand Down
Expand Up @@ -34,7 +34,7 @@
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.logging.SimpleLogService;
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.scheduler.JobScheduler;
Expand Down Expand Up @@ -164,7 +164,7 @@ public long maxMemoryUsage()
else
{
System.out.println( "Seed " + randomSeed );
final JobScheduler jobScheduler = new Neo4jJobScheduler();
final JobScheduler jobScheduler = new CentralJobScheduler();
consumer = BatchImporterFactory.withHighestPriority().instantiate( dir, fileSystem, null, importConfig,
new SimpleLogService( logging, logging ), defaultVisible( jobScheduler ), EMPTY, dbConfig,
RecordFormatSelector.selectForConfig( dbConfig, logging ), NO_MONITOR );
Expand Down
Expand Up @@ -49,7 +49,7 @@
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointerMonitor;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.kernel.info.DiagnosticsManager;
import org.neo4j.kernel.info.JvmChecker;
import org.neo4j.kernel.info.JvmMetadataRepository;
Expand Down Expand Up @@ -301,7 +301,7 @@ protected LogService createLogService( LogProvider userLogProvider )

protected JobScheduler createJobScheduler()
{
return new Neo4jJobScheduler();
return new CentralJobScheduler();
}

protected PageCache createPageCache( FileSystemAbstraction fileSystem, Config config, LogService logging,
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.util;
package org.neo4j.kernel.impl.scheduler;

import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -48,7 +48,7 @@

import static org.neo4j.kernel.impl.util.DebugUtil.trackTest;

public class Neo4jJobScheduler extends LifecycleAdapter implements JobScheduler
public class CentralJobScheduler extends LifecycleAdapter implements JobScheduler
{
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final Group SCHEDULER_GROUP = new Group( "Scheduler" );
Expand All @@ -67,7 +67,7 @@ public class Neo4jJobScheduler extends LifecycleAdapter implements JobScheduler

private volatile boolean started;

public Neo4jJobScheduler()
public CentralJobScheduler()
{
workStealingExecutors = new ConcurrentHashMap<>( 1 );
topLevelGroup = new ThreadGroup( "Neo4j-" + INSTANCE_COUNTER.incrementAndGet() + trackTest() );
Expand Down
Expand Up @@ -79,7 +79,7 @@
import org.neo4j.kernel.impl.transaction.state.DefaultIndexProviderMap;
import org.neo4j.kernel.impl.transaction.state.DirectIndexUpdates;
import org.neo4j.kernel.impl.transaction.state.IndexUpdates;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.kernel.lifecycle.LifeRule;
import org.neo4j.kernel.lifecycle.LifecycleException;
import org.neo4j.logging.AssertableLogProvider;
Expand Down Expand Up @@ -1224,7 +1224,7 @@ private IndexingService newIndexingServiceWithMockedDependencies( IndexPopulator
Config config = Config.defaults( GraphDatabaseSettings.multi_threaded_schema_index_population_enabled, "false" );

return life.add( IndexingServiceFactory.createIndexingService( config,
life.add( new Neo4jJobScheduler() ),
life.add( new CentralJobScheduler() ),
new DefaultIndexProviderMap( indexProvider ),
storeView,
nameLookup,
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.kernel.api.schema.SchemaDescriptorFactory;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptorFactory;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.test.DoubleLatch;

Expand Down Expand Up @@ -63,7 +63,7 @@ public void shouldNotRunASampleJobWhichIsAlreadyRunning() throws Throwable
{
// given
when( config.jobLimit() ).thenReturn( 2 );
JobScheduler jobScheduler = new Neo4jJobScheduler();
JobScheduler jobScheduler = new CentralJobScheduler();
jobScheduler.init();
IndexSamplingJobTracker jobTracker = new IndexSamplingJobTracker( config, jobScheduler );
final DoubleLatch latch = new DoubleLatch();
Expand Down Expand Up @@ -104,7 +104,7 @@ public void shouldNotAcceptMoreJobsThanAllowed() throws Throwable
{
// given
when( config.jobLimit() ).thenReturn( 1 );
JobScheduler jobScheduler = new Neo4jJobScheduler();
JobScheduler jobScheduler = new CentralJobScheduler();
jobScheduler.init();

final IndexSamplingJobTracker jobTracker = new IndexSamplingJobTracker( config, jobScheduler );
Expand Down Expand Up @@ -168,7 +168,7 @@ public void shouldAcceptNewJobWhenRunningJobFinishes() throws Throwable
// Given
when( config.jobLimit() ).thenReturn( 1 );

JobScheduler jobScheduler = new Neo4jJobScheduler();
JobScheduler jobScheduler = new CentralJobScheduler();
jobScheduler.init();

final IndexSamplingJobTracker jobTracker = new IndexSamplingJobTracker( config, jobScheduler );
Expand Down Expand Up @@ -254,7 +254,7 @@ public void shouldStopAndWaitForAllJobsToFinish() throws Throwable
// Given
when( config.jobLimit() ).thenReturn( 2 );

JobScheduler jobScheduler = new Neo4jJobScheduler();
JobScheduler jobScheduler = new CentralJobScheduler();
jobScheduler.init();

final IndexSamplingJobTracker jobTracker = new IndexSamplingJobTracker( config, jobScheduler );
Expand Down Expand Up @@ -292,7 +292,7 @@ public void shouldWaitForAllJobsToFinish() throws Throwable
// Given
when( config.jobLimit() ).thenReturn( 2 );

JobScheduler jobScheduler = new Neo4jJobScheduler();
JobScheduler jobScheduler = new CentralJobScheduler();
jobScheduler.init();

final IndexSamplingJobTracker jobTracker = new IndexSamplingJobTracker( config, jobScheduler );
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.util;
package org.neo4j.kernel.impl.scheduler;

import org.junit.After;
import org.junit.Test;
Expand Down Expand Up @@ -50,11 +50,11 @@
import static org.junit.Assert.fail;
import static org.neo4j.scheduler.JobScheduler.Groups.indexPopulation;

public class Neo4jJobSchedulerTest
public class CentralJobSchedulerTest
{
private final AtomicInteger invocations = new AtomicInteger();
private final LifeSupport life = new LifeSupport();
private final Neo4jJobScheduler scheduler = life.add( new Neo4jJobScheduler() );
private final CentralJobScheduler scheduler = life.add( new CentralJobScheduler() );

private final Runnable countInvocationsJob = invocations::incrementAndGet;

Expand Down Expand Up @@ -189,8 +189,8 @@ public void longRunningScheduledJobsMustNotDelayOtherLongRunningJobs()
public void shouldNotifyCancelListeners()
{
// GIVEN
Neo4jJobScheduler neo4jJobScheduler = new Neo4jJobScheduler();
neo4jJobScheduler.init();
CentralJobScheduler centralJobScheduler = new CentralJobScheduler();
centralJobScheduler.init();

// WHEN
AtomicBoolean halted = new AtomicBoolean();
Expand All @@ -201,19 +201,19 @@ public void shouldNotifyCancelListeners()
LockSupport.parkNanos( MILLISECONDS.toNanos( 10 ) );
}
};
JobHandle handle = neo4jJobScheduler.schedule( indexPopulation, job );
JobHandle handle = centralJobScheduler.schedule( indexPopulation, job );
handle.registerCancelListener( mayBeInterrupted -> halted.set( true ) );
handle.cancel( false );

// THEN
assertTrue( halted.get() );
neo4jJobScheduler.shutdown();
centralJobScheduler.shutdown();
}

@Test( timeout = 10_000 )
public void waitTerminationOnDelayedJobMustWaitUntilJobCompletion() throws Exception
{
Neo4jJobScheduler scheduler = new Neo4jJobScheduler();
CentralJobScheduler scheduler = new CentralJobScheduler();
scheduler.init();

AtomicBoolean triggered = new AtomicBoolean();
Expand All @@ -232,7 +232,7 @@ public void waitTerminationOnDelayedJobMustWaitUntilJobCompletion() throws Excep
@Test( timeout = 10_000 )
public void scheduledTasksThatThrowsMustPropagateException() throws Exception
{
Neo4jJobScheduler scheduler = new Neo4jJobScheduler();
CentralJobScheduler scheduler = new CentralJobScheduler();
scheduler.init();

RuntimeException boom = new RuntimeException( "boom" );
Expand All @@ -258,7 +258,7 @@ public void scheduledTasksThatThrowsMustPropagateException() throws Exception
@Test( timeout = 10_000 )
public void scheduledTasksThatThrowsShouldStop() throws Exception
{
Neo4jJobScheduler scheduler = new Neo4jJobScheduler();
CentralJobScheduler scheduler = new CentralJobScheduler();
scheduler.init();

BinaryLatch triggerLatch = new BinaryLatch();
Expand Down
Expand Up @@ -19,21 +19,21 @@
*/
package org.neo4j.kernel.impl.util;

import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.scheduler.JobScheduler;

public class CountingJobScheduler implements JobScheduler
{
private final AtomicInteger counter;
private final Neo4jJobScheduler delegate;
private final CentralJobScheduler delegate;

public CountingJobScheduler( AtomicInteger counter, Neo4jJobScheduler delegate )
public CountingJobScheduler( AtomicInteger counter, CentralJobScheduler delegate )
{
this.counter = counter;
this.delegate = delegate;
Expand Down
Expand Up @@ -28,20 +28,21 @@

import org.neo4j.io.fs.watcher.FileWatcher;
import org.neo4j.io.fs.watcher.SilentFileWatcher;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.kernel.impl.util.watcher.FileWatcherLifecycleAdapter;

import static org.mockito.Mockito.verify;

public class FileWatcherLifecycleAdapterTest
{

private static Neo4jJobScheduler jobScheduler;
private static CentralJobScheduler jobScheduler;
private FileWatcher fileWatcher = Mockito.mock( FileWatcher.class );

@BeforeClass
public static void setUp()
{
jobScheduler = new Neo4jJobScheduler();
jobScheduler = new CentralJobScheduler();
}

@AfterClass
Expand Down
Expand Up @@ -53,7 +53,7 @@
import org.neo4j.kernel.impl.store.id.configuration.CommunityIdTypeConfigurationProvider;
import org.neo4j.kernel.impl.transaction.state.DefaultIndexProviderMap;
import org.neo4j.kernel.impl.util.IdOrderingQueue;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.kernel.impl.util.SynchronizedArrayIdOrderingQueue;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.internal.KernelEventHandlers;
Expand Down Expand Up @@ -107,7 +107,7 @@ private RecordStorageEngine get( FileSystemAbstraction fs, PageCache pageCache,
ExplicitIndexProviderLookup explicitIndexProviderLookup = mock( ExplicitIndexProviderLookup.class );
when( explicitIndexProviderLookup.all() ).thenReturn( Iterables.empty() );
IndexConfigStore indexConfigStore = new IndexConfigStore( storeDirectory, fs );
JobScheduler scheduler = life.add( new Neo4jJobScheduler() );
JobScheduler scheduler = life.add( new CentralJobScheduler() );
Config config = Config.defaults();

BufferingIdGeneratorFactory bufferingIdGeneratorFactory =
Expand Down
Expand Up @@ -51,7 +51,7 @@
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.test.rule.PageCacheAndDependenciesRule;
Expand Down Expand Up @@ -116,7 +116,7 @@ protected PlatformModule createPlatform( File storeDir, Config config, Dependenc
return new PlatformModule( storeDir, config, databaseInfo, dependencies, graphDatabaseFacade )
{
@Override
protected Neo4jJobScheduler createJobScheduler()
protected CentralJobScheduler createJobScheduler()
{
return newSlowJobScheduler();
}
Expand All @@ -132,9 +132,9 @@ protected LogService createLogService( LogProvider userLogProvider )
graphDatabaseFactoryState.databaseDependencies() );
}

private static Neo4jJobScheduler newSlowJobScheduler()
private static CentralJobScheduler newSlowJobScheduler()
{
return new Neo4jJobScheduler()
return new CentralJobScheduler()
{
@Override
public JobHandle schedule( Group group, Runnable job )
Expand Down
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.scheduler.JobScheduler.Group;
Expand All @@ -39,7 +39,7 @@ public class ContinuousJobTest
{
private static final long DEFAULT_TIMEOUT_MS = 15_000;
private final Group jobGroup = new Group( "test" );
private final Neo4jJobScheduler scheduler = new Neo4jJobScheduler();
private final CentralJobScheduler scheduler = new CentralJobScheduler();

@Test
public void shouldRunJobContinuously() throws Throwable
Expand Down
Expand Up @@ -38,7 +38,7 @@
import org.neo4j.causalclustering.identity.RaftTestMemberSetBuilder;
import org.neo4j.causalclustering.messaging.TestNetwork;
import org.neo4j.function.Predicates;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.scheduler.JobScheduler;

Expand All @@ -51,7 +51,7 @@ public class Fixture
private final Set<MemberId> members = new HashSet<>();
private final Set<BootstrapWaiter> bootstrapWaiters = new HashSet<>();
private final List<TimerService> timerServices = new ArrayList<>();
private final JobScheduler scheduler = new Neo4jJobScheduler();
private final JobScheduler scheduler = new CentralJobScheduler();
final Set<RaftFixture> rafts = new HashSet<>();
final TestNetwork net;

Expand Down

0 comments on commit 22ebc0f

Please sign in to comment.