Skip to content

Commit

Permalink
Make it possible to set the CentralJobScheduler top-level group name.
Browse files Browse the repository at this point in the history
And have the causal cluster code use this to publish and tag their cluster member identities to their threads.
  • Loading branch information
chrisvest committed Apr 19, 2018
1 parent b0e8b93 commit 4f6535f
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 4 deletions.
Expand Up @@ -255,6 +255,14 @@ interface CancelListener
void cancelled( boolean mayInterruptIfRunning ); void cancelled( boolean mayInterruptIfRunning );
} }


/**
* Assign a specific name to the top-most scheduler group.
* <p>
* This is just a suggestion for debugging purpose. The specific scheduler implementation is free to ignore calls
* to this method.
*/
void setTopLevelGroupName( String name );

/** Expose a group scheduler as an {@link Executor} */ /** Expose a group scheduler as an {@link Executor} */
Executor executor( Group group ); Executor executor( Group group );


Expand Down
Expand Up @@ -46,6 +46,11 @@ public void shutdown()
{ // no-op { // no-op
} }


@Override
public void setTopLevelGroupName( String name )
{
}

@Override @Override
public Executor executor( Group group ) public Executor executor( Group group )
{ {
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/ */
package org.neo4j.kernel.impl.scheduler; package org.neo4j.kernel.impl.scheduler;


import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -47,15 +48,30 @@ public class CentralJobScheduler extends LifecycleAdapter implements JobSchedule
// fashion. // fashion.
private final ConcurrentHashMap<Group,ExecutorService> workStealingExecutors; private final ConcurrentHashMap<Group,ExecutorService> workStealingExecutors;


private final ThreadGroup topLevelGroup; private final TopLevelGroup topLevelGroup;
private final ThreadPoolManager pools; private final ThreadPoolManager pools;


private volatile boolean started; private volatile boolean started;


private static class TopLevelGroup extends ThreadGroup
{
TopLevelGroup()
{
super ( "Neo4j-" + INSTANCE_COUNTER.incrementAndGet() );
}

public void setName( String name ) throws Exception
{
Field field = ThreadGroup.class.getDeclaredField( "name" );
field.setAccessible( true );
field.set( this, name );
}
}

public CentralJobScheduler() public CentralJobScheduler()
{ {
workStealingExecutors = new ConcurrentHashMap<>( 1 ); workStealingExecutors = new ConcurrentHashMap<>( 1 );
topLevelGroup = new ThreadGroup( "Neo4j-" + INSTANCE_COUNTER.incrementAndGet() ); topLevelGroup = new TopLevelGroup();
pools = new ThreadPoolManager( topLevelGroup ); pools = new ThreadPoolManager( topLevelGroup );
ThreadFactory threadFactory = new GroupedDaemonThreadFactory( SCHEDULER_GROUP, topLevelGroup ); ThreadFactory threadFactory = new GroupedDaemonThreadFactory( SCHEDULER_GROUP, topLevelGroup );
scheduler = new TimeBasedTaskScheduler( Clocks.nanoClock(), pools ); scheduler = new TimeBasedTaskScheduler( Clocks.nanoClock(), pools );
Expand All @@ -66,6 +82,18 @@ public CentralJobScheduler()
schedulerThread.setPriority( priority ); schedulerThread.setPriority( priority );
} }


@Override
public void setTopLevelGroupName( String name )
{
try
{
topLevelGroup.setName( name );
}
catch ( Exception ignore )
{
}
}

@Override @Override
public void init() public void init()
{ {
Expand Down
Expand Up @@ -40,7 +40,23 @@ final class GroupedDaemonThreadFactory implements ThreadFactory, ForkJoinPool.Fo
@Override @Override
public Thread newThread( @SuppressWarnings( "NullableProblems" ) Runnable job ) public Thread newThread( @SuppressWarnings( "NullableProblems" ) Runnable job )
{ {
Thread thread = new Thread( threadGroup, job, group.threadName() ); Thread thread = new Thread( threadGroup, job, group.threadName() )
{
@Override
public String toString()
{
StringBuilder sb = new StringBuilder( "Thread[" ).append( getName() );
ThreadGroup group = getThreadGroup();
String sep = ", in ";
while ( group != null )
{
sb.append( sep ).append( group.getName() );
group = group.getParent();
sep = "/";
}
return sb.append( ']' ).toString();
}
};
thread.setDaemon( true ); thread.setDaemon( true );
return thread; return thread;
} }
Expand Down
Expand Up @@ -39,6 +39,12 @@ public CountingJobScheduler( AtomicInteger counter, CentralJobScheduler delegate
this.delegate = delegate; this.delegate = delegate;
} }


@Override
public void setTopLevelGroupName( String name )
{
delegate.setTopLevelGroupName( name );
}

@Override @Override
public Executor executor( Group group ) public Executor executor( Group group )
{ {
Expand Down
Expand Up @@ -88,6 +88,11 @@ private long now()
return instant().toEpochMilli(); return instant().toEpochMilli();
} }


@Override
public void setTopLevelGroupName( String name )
{
}

@Override @Override
public Executor executor( Group group ) public Executor executor( Group group )
{ {
Expand Down
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.test; package org.neo4j.test;


import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
Expand All @@ -46,6 +45,11 @@ public OnDemandJobScheduler( boolean removeJobsAfterExecution )
this.removeJobsAfterExecution = removeJobsAfterExecution; this.removeJobsAfterExecution = removeJobsAfterExecution;
} }


@Override
public void setTopLevelGroupName( String name )
{
}

@Override @Override
public Executor executor( Group group ) public Executor executor( Group group )
{ {
Expand Down
Expand Up @@ -70,6 +70,8 @@ public class IdentityModule
{ {
throw new RuntimeException( e ); throw new RuntimeException( e );
} }

platformModule.jobScheduler.setTopLevelGroupName( "Core " + myself );
} }


public MemberId myself() public MemberId myself()
Expand Down
Expand Up @@ -152,6 +152,7 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule,
LogService logging = platformModule.logging; LogService logging = platformModule.logging;


ioLimiter = new ConfigurableIOLimiter( platformModule.config ); ioLimiter = new ConfigurableIOLimiter( platformModule.config );
platformModule.jobScheduler.setTopLevelGroupName( "ReadReplica " + myself );


org.neo4j.kernel.impl.util.Dependencies dependencies = platformModule.dependencies; org.neo4j.kernel.impl.util.Dependencies dependencies = platformModule.dependencies;
Config config = platformModule.config; Config config = platformModule.config;
Expand Down
Expand Up @@ -164,6 +164,11 @@ void setActions( Runnable... actions )


private class NoopJobScheduler extends LifecycleAdapter implements JobScheduler private class NoopJobScheduler extends LifecycleAdapter implements JobScheduler
{ {
@Override
public void setTopLevelGroupName( String name )
{
}

@Override @Override
public Executor executor( Group group ) public Executor executor( Group group )
{ {
Expand Down

0 comments on commit 4f6535f

Please sign in to comment.