Skip to content

Commit

Permalink
Extract inner classes out of the JobScheduler interface, and into top…
Browse files Browse the repository at this point in the history
…-level.
  • Loading branch information
chrisvest committed Aug 6, 2018
1 parent 27a629e commit ff6f225
Show file tree
Hide file tree
Showing 52 changed files with 262 additions and 157 deletions.
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.scheduler;

/**
* Gets notified about calls to {@link JobHandle#cancel(boolean)}.
*/
public interface CancelListener
{
/**
* Notification that {@link JobHandle#cancel(boolean)} was called.
*
* @param mayInterruptIfRunning argument from {@link JobHandle#cancel(boolean)} call.
*/
void cancelled( boolean mayInterruptIfRunning );
}
75 changes: 75 additions & 0 deletions community/common/src/main/java/org/neo4j/scheduler/Group.java
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.scheduler;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Represents a common group of jobs, defining how they should be scheduled.
*/
public final class Group
{
private final AtomicInteger threadCounter = new AtomicInteger();
private final String name;

Group( String name )
{
Objects.requireNonNull( name, "Group name cannot be null." );
this.name = name;
}

public String name()
{
return name;
}

/**
* Name a new thread. This method may or may not be used, it is up to the scheduling strategy to decide
* to honor this.
*/
public String threadName()
{
return "neo4j." + name() + "-" + threadCounter.incrementAndGet();
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}

Group group = (Group) o;

return name.equals( group.name );
}

@Override
public int hashCode()
{
return name.hashCode();
}
}
35 changes: 35 additions & 0 deletions community/common/src/main/java/org/neo4j/scheduler/JobHandle.java
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.scheduler;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

public interface JobHandle
{
void cancel( boolean mayInterruptIfRunning );

void waitTermination() throws InterruptedException, ExecutionException, CancellationException;

default void registerCancelListener( CancelListener listener )
{
throw new UnsupportedOperationException( "Unsupported in this implementation" );
}
}
Expand Up @@ -19,15 +19,11 @@
*/ */
package org.neo4j.scheduler; package org.neo4j.scheduler;


import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.Lifecycle;


Expand All @@ -37,58 +33,6 @@
*/ */
public interface JobScheduler extends Lifecycle public interface JobScheduler extends Lifecycle
{ {
/**
* Represents a common group of jobs, defining how they should be scheduled.
*/
final class Group
{
private final AtomicInteger threadCounter = new AtomicInteger();
private final String name;

private Group( String name )
{
Objects.requireNonNull( name, "Group name cannot be null." );
this.name = name;
}

public String name()
{
return name;
}

/**
* Name a new thread. This method may or may not be used, it is up to the scheduling strategy to decide
* to honor this.
*/
public String threadName()
{
return "neo4j." + name() + "-" + threadCounter.incrementAndGet();
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}

Group group = (Group) o;

return name.equals( group.name );
}

@Override
public int hashCode()
{
return name.hashCode();
}
}

/** /**
* This is an exhaustive list of job types that run in the database. It should be expanded as needed for new groups * This is an exhaustive list of job types that run in the database. It should be expanded as needed for new groups
* of jobs. * of jobs.
Expand Down Expand Up @@ -242,31 +186,6 @@ private Groups()
} }
} }


interface JobHandle
{
void cancel( boolean mayInterruptIfRunning );

void waitTermination() throws InterruptedException, ExecutionException, CancellationException;

default void registerCancelListener( CancelListener listener )
{
throw new UnsupportedOperationException( "Unsupported in this implementation" );
}
}

/**
* Gets notified about calls to {@link JobHandle#cancel(boolean)}.
*/
interface CancelListener
{
/**
* Notification that {@link JobHandle#cancel(boolean)} was called.
*
* @param mayInterruptIfRunning argument from {@link JobHandle#cancel(boolean)} call.
*/
void cancelled( boolean mayInterruptIfRunning );
}

/** /**
* Assign a specific name to the top-most scheduler group. * Assign a specific name to the top-most scheduler group.
* <p> * <p>
Expand Down
Expand Up @@ -52,6 +52,8 @@
import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler; import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobHandle;
import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.test.rule.PageCacheAndDependenciesRule; import org.neo4j.test.rule.PageCacheAndDependenciesRule;
import org.neo4j.test.rule.fs.DefaultFileSystemRule; import org.neo4j.test.rule.fs.DefaultFileSystemRule;
Expand Down
Expand Up @@ -27,6 +27,8 @@
import java.util.List; import java.util.List;


import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobHandle;
import org.neo4j.scheduler.JobSchedulerAdapter; import org.neo4j.scheduler.JobSchedulerAdapter;


import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.scheduler.JobHandle;
import org.neo4j.scheduler.JobScheduler; import org.neo4j.scheduler.JobScheduler;


import static org.neo4j.scheduler.JobScheduler.Groups.transactionTimeoutMonitor; import static org.neo4j.scheduler.JobScheduler.Groups.transactionTimeoutMonitor;
Expand All @@ -31,7 +32,7 @@ public class KernelTransactionMonitorScheduler extends LifecycleAdapter
private final KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor; private final KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor;
private final JobScheduler scheduler; private final JobScheduler scheduler;
private final long checkIntervalMillis; private final long checkIntervalMillis;
private JobScheduler.JobHandle monitorJobHandle; private JobHandle monitorJobHandle;


public KernelTransactionMonitorScheduler( KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor, public KernelTransactionMonitorScheduler( KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor,
JobScheduler scheduler, long checkIntervalMillis ) JobScheduler scheduler, long checkIntervalMillis )
Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.neo4j.kernel.impl.api.index.IndexMapSnapshotProvider; import org.neo4j.kernel.impl.api.index.IndexMapSnapshotProvider;
import org.neo4j.kernel.impl.api.index.IndexProxy; import org.neo4j.kernel.impl.api.index.IndexProxy;
import org.neo4j.scheduler.JobScheduler; import org.neo4j.scheduler.JobScheduler;
import org.neo4j.scheduler.JobScheduler.JobHandle; import org.neo4j.scheduler.JobHandle;


import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static org.neo4j.kernel.impl.api.index.sampling.IndexSamplingMode.BACKGROUND_REBUILD_UPDATED; import static org.neo4j.kernel.impl.api.index.sampling.IndexSamplingMode.BACKGROUND_REBUILD_UPDATED;
Expand Down
Expand Up @@ -31,6 +31,8 @@


import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobHandle;
import org.neo4j.scheduler.JobScheduler; import org.neo4j.scheduler.JobScheduler;
import org.neo4j.time.Clocks; import org.neo4j.time.Clocks;


Expand Down
Expand Up @@ -24,14 +24,14 @@
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;


import org.neo4j.scheduler.JobScheduler; import org.neo4j.scheduler.Group;


final class GroupedDaemonThreadFactory implements ThreadFactory, ForkJoinPool.ForkJoinWorkerThreadFactory final class GroupedDaemonThreadFactory implements ThreadFactory, ForkJoinPool.ForkJoinWorkerThreadFactory
{ {
private final JobScheduler.Group group; private final Group group;
private final ThreadGroup threadGroup; private final ThreadGroup threadGroup;


GroupedDaemonThreadFactory( JobScheduler.Group group, ThreadGroup parentThreadGroup ) GroupedDaemonThreadFactory( Group group, ThreadGroup parentThreadGroup )
{ {
this.group = group; this.group = group;
threadGroup = new ThreadGroup( parentThreadGroup, group.name() ); threadGroup = new ThreadGroup( parentThreadGroup, group.name() );
Expand Down
Expand Up @@ -25,14 +25,15 @@
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;


import org.neo4j.scheduler.JobScheduler; import org.neo4j.scheduler.CancelListener;
import org.neo4j.scheduler.JobHandle;


final class PooledJobHandle implements JobScheduler.JobHandle final class PooledJobHandle implements JobHandle
{ {
private final Future<?> future; private final Future<?> future;
private final Object registryKey; private final Object registryKey;
private final ConcurrentHashMap<Object,Future<?>> registry; private final ConcurrentHashMap<Object,Future<?>> registry;
private final List<JobScheduler.CancelListener> cancelListeners = new CopyOnWriteArrayList<>(); private final List<CancelListener> cancelListeners = new CopyOnWriteArrayList<>();


PooledJobHandle( Future<?> future, Object registryKey, ConcurrentHashMap<Object,Future<?>> registry ) PooledJobHandle( Future<?> future, Object registryKey, ConcurrentHashMap<Object,Future<?>> registry )
{ {
Expand All @@ -45,7 +46,7 @@ final class PooledJobHandle implements JobScheduler.JobHandle
public void cancel( boolean mayInterruptIfRunning ) public void cancel( boolean mayInterruptIfRunning )
{ {
future.cancel( mayInterruptIfRunning ); future.cancel( mayInterruptIfRunning );
for ( JobScheduler.CancelListener cancelListener : cancelListeners ) for ( CancelListener cancelListener : cancelListeners )
{ {
cancelListener.cancelled( mayInterruptIfRunning ); cancelListener.cancelled( mayInterruptIfRunning );
} }
Expand All @@ -59,7 +60,7 @@ public void waitTermination() throws InterruptedException, ExecutionException
} }


@Override @Override
public void registerCancelListener( JobScheduler.CancelListener listener ) public void registerCancelListener( CancelListener listener )
{ {
cancelListeners.add( listener ); cancelListeners.add( listener );
} }
Expand Down
Expand Up @@ -24,9 +24,9 @@
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import org.neo4j.scheduler.JobScheduler; import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobScheduler.CancelListener; import org.neo4j.scheduler.CancelListener;
import org.neo4j.scheduler.JobScheduler.JobHandle; import org.neo4j.scheduler.JobHandle;
import org.neo4j.util.concurrent.BinaryLatch; import org.neo4j.util.concurrent.BinaryLatch;


/** /**
Expand Down Expand Up @@ -62,14 +62,14 @@ final class ScheduledJobHandle extends AtomicInteger implements JobHandle
// or happens after the relevant handles have been added to the queue. // or happens after the relevant handles have been added to the queue.
long nextDeadlineNanos; long nextDeadlineNanos;


private final JobScheduler.Group group; private final Group group;
private final CopyOnWriteArrayList<CancelListener> cancelListeners; private final CopyOnWriteArrayList<CancelListener> cancelListeners;
private final BinaryLatch handleRelease; private final BinaryLatch handleRelease;
private final Runnable task; private final Runnable task;
private volatile JobHandle latestHandle; private volatile JobHandle latestHandle;
private volatile Throwable lastException; private volatile Throwable lastException;


ScheduledJobHandle( TimeBasedTaskScheduler scheduler, JobScheduler.Group group, Runnable task, ScheduledJobHandle( TimeBasedTaskScheduler scheduler, Group group, Runnable task,
long nextDeadlineNanos, long reschedulingDelayNanos ) long nextDeadlineNanos, long reschedulingDelayNanos )
{ {
this.group = group; this.group = group;
Expand Down

0 comments on commit ff6f225

Please sign in to comment.