Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Mar 2, 2018
1 parent e370d14 commit ed2497e
Show file tree
Hide file tree
Showing 19 changed files with 107 additions and 154 deletions.
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.Channel; import io.netty.channel.Channel;


import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.RejectedExecutionException;


import org.neo4j.bolt.v1.runtime.Job; import org.neo4j.bolt.v1.runtime.Job;


Expand Down Expand Up @@ -56,13 +57,6 @@ public interface BoltConnection
*/ */
Channel channel(); Channel channel();


/**
* Returns the principal the client used to connect
*
* @return user name
*/
String principal();

/** /**
* Returns whether there's any pending Job waiting to be processed * Returns whether there's any pending Job waiting to be processed
* *
Expand All @@ -89,6 +83,13 @@ public interface BoltConnection
*/ */
boolean processNextBatch(); boolean processNextBatch();


/**
* Invoked when an exception is caught during the scheduling of the pending jobs. The caught exception would mostly
* be {@link RejectedExecutionException} which is thrown by the thread pool executor when it fails to accept
* submitted jobs
*
* @param t the exception occurred during scheduling
*/
void handleSchedulingError( Throwable t ); void handleSchedulingError( Throwable t );


/** /**
Expand Down
Expand Up @@ -33,7 +33,7 @@ public class BoltConnectionQueueMonitorAggregate implements BoltConnectionQueueM


public BoltConnectionQueueMonitorAggregate( BoltConnectionQueueMonitor... monitors ) public BoltConnectionQueueMonitorAggregate( BoltConnectionQueueMonitor... monitors )
{ {
this.monitors = new ArrayList<>( Arrays.asList( monitors ) ); this.monitors = Arrays.asList( monitors );
} }


@Override @Override
Expand Down
Expand Up @@ -75,13 +75,13 @@ protected int getHighWatermark()
@Override @Override
public void enqueued( BoltConnection to, Job job ) public void enqueued( BoltConnection to, Job job )
{ {
checkLimitsOnEnqueue( to, counters.compute( to.id(), ( k, v ) -> v == null ? new AtomicInteger( 0 ) : v ).incrementAndGet() ); checkLimitsOnEnqueue( to, counters.computeIfAbsent( to.id(), k -> new AtomicInteger( 0 ) ).incrementAndGet() );
} }


@Override @Override
public void drained( BoltConnection from, Collection<Job> batch ) public void drained( BoltConnection from, Collection<Job> batch )
{ {
checkLimitsOnDequeue( from, counters.compute( from.id(), ( k, v ) -> v == null ? new AtomicInteger( 0 ) : v ).addAndGet( -batch.size() ) ); checkLimitsOnDequeue( from, counters.computeIfAbsent( from.id(), k -> new AtomicInteger( 0 ) ).addAndGet( -batch.size() ) );
} }


private void checkLimitsOnEnqueue( BoltConnection connection, int currentSize ) private void checkLimitsOnEnqueue( BoltConnection connection, int currentSize )
Expand Down
Expand Up @@ -22,6 +22,8 @@
public interface BoltScheduler extends BoltConnectionLifetimeListener, BoltConnectionQueueMonitor public interface BoltScheduler extends BoltConnectionLifetimeListener, BoltConnectionQueueMonitor
{ {


String connector();

void start(); void start();


void stop(); void stop();
Expand Down
Expand Up @@ -61,48 +61,19 @@ public ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepA
return result; return result;
} }


@Override private static BlockingQueue<Runnable> createTaskQueue( int queueSize )
public void destroy( ExecutorService executor )
{
if ( !( executor instanceof ThreadPool ) )
{
throw new IllegalArgumentException(
String.format( "The passed executor should already be created by '%s#create()'.", CachedThreadPoolExecutorFactory.class.getName() ) );
}

executor.shutdown();
try
{
if ( !executor.awaitTermination( 10, TimeUnit.SECONDS ) )
{
executor.shutdownNow();

if ( !executor.awaitTermination( 10, TimeUnit.SECONDS ) )
{
log.error( "Thread pool did not terminate gracefully despite all efforts" );
}
}
}
catch ( InterruptedException ex )
{
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

private static BlockingQueue createTaskQueue( int queueSize )
{ {
if ( queueSize == UNBOUNDED_QUEUE ) if ( queueSize == UNBOUNDED_QUEUE )
{ {
return new LinkedBlockingQueue(); return new LinkedBlockingQueue<>();
} }
else if ( queueSize == SYNCHRONOUS_QUEUE ) else if ( queueSize == SYNCHRONOUS_QUEUE )
{ {
return new SynchronousQueue(); return new SynchronousQueue<>();
} }
else if ( queueSize > 0 ) else if ( queueSize > 0 )
{ {
return new ArrayBlockingQueue( queueSize ); return new ArrayBlockingQueue<>( queueSize );
} }


throw new IllegalArgumentException( String.format( "Unsupported queue size %d for thread pool creation.", queueSize ) ); throw new IllegalArgumentException( String.format( "Unsupported queue size %d for thread pool creation.", queueSize ) );
Expand Down
Expand Up @@ -108,12 +108,6 @@ public Channel channel()
return channel.rawChannel(); return channel.rawChannel();
} }


@Override
public String principal()
{
return machine.owner();
}

@Override @Override
public boolean hasPendingJobs() public boolean hasPendingJobs()
{ {
Expand All @@ -129,8 +123,7 @@ public void start()
@Override @Override
public void enqueue( Job job ) public void enqueue( Job job )
{ {
queue.offer( job ); enqueueInternal( job );
notifyEnqueued( job );
} }


@Override @Override
Expand Down Expand Up @@ -254,10 +247,11 @@ public void stop()
{ {
machine.terminate(); machine.terminate();


if ( !hasPendingJobs() ) // Enqueue an empty job for close to be handled linearly
enqueueInternal( ignore ->
{ {
processNextBatch( 0 );
} } );
} }
} }


Expand All @@ -280,6 +274,12 @@ private void close()
} }
} }


private void enqueueInternal( Job job )
{
queue.offer( job );
notifyEnqueued( job );
}

private void notifyCreated() private void notifyCreated()
{ {
if ( listener != null ) if ( listener != null )
Expand Down
Expand Up @@ -51,8 +51,7 @@ public class ExecutorBoltScheduler implements BoltScheduler, BoltConnectionLifet
private ExecutorService threadPool; private ExecutorService threadPool;


public ExecutorBoltScheduler( String connector, ExecutorFactory executorFactory, JobScheduler scheduler, LogService logService, int corePoolSize, public ExecutorBoltScheduler( String connector, ExecutorFactory executorFactory, JobScheduler scheduler, LogService logService, int corePoolSize,
int maxPoolSize, int maxPoolSize, Duration keepAlive, int queueSize, ExecutorService forkJoinPool )
Duration keepAlive, int queueSize, ExecutorService forkJoinPool )
{ {
this.connector = connector; this.connector = connector;
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
Expand All @@ -75,24 +74,36 @@ boolean isActive( BoltConnection connection )
return activeWorkItems.containsKey( connection.id() ); return activeWorkItems.containsKey( connection.id() );
} }


@Override
public String connector()
{
return connector;
}

@Override
public void start() public void start()
{ {
threadPool = executorFactory.create( corePoolSize, maxPoolSize, keepAlive, queueSize, threadPool = executorFactory.create( corePoolSize, maxPoolSize, keepAlive, queueSize,
new NameAppendingThreadFactory( connector, scheduler.threadFactory( JobScheduler.Groups.boltWorker ) ) ); new NameAppendingThreadFactory( connector, scheduler.threadFactory( JobScheduler.Groups.boltWorker ) ) );
} }


@Override
public void stop() public void stop()
{ {
if ( threadPool != null ) if ( threadPool != null )
{ {
executorFactory.destroy( threadPool ); activeConnections.values().forEach( this::stopConnection );

threadPool.shutdown();
} }
} }


@Override @Override
public void created( BoltConnection connection ) public void created( BoltConnection connection )
{ {
activeConnections.put( connection.id(), connection ); BoltConnection previous = activeConnections.put( connection.id(), connection );
// We do not expect the same (keyed) connection twice
assert previous == null;
} }


@Override @Override
Expand All @@ -102,7 +113,7 @@ public void closed( BoltConnection connection )


try try
{ {
CompletableFuture currentFuture = activeWorkItems.remove( id ); CompletableFuture<Boolean> currentFuture = activeWorkItems.remove( id );
if ( currentFuture != null ) if ( currentFuture != null )
{ {
currentFuture.cancel( true ); currentFuture.cancel( true );
Expand Down Expand Up @@ -144,7 +155,7 @@ private boolean executeBatch( BoltConnection connection )
{ {
Thread currentThread = Thread.currentThread(); Thread currentThread = Thread.currentThread();
String originalName = currentThread.getName(); String originalName = currentThread.getName();
String newName = String.format( "%s [%s] ", originalName, connection.remoteAddress(), connector ); String newName = String.format( "%s [%s] ", originalName, connection.remoteAddress() );


currentThread.setName( newName ); currentThread.setName( newName );
try try
Expand All @@ -157,7 +168,7 @@ private boolean executeBatch( BoltConnection connection )
} }
} }


private void handleCompletion( BoltConnection connection, Object shouldContinueScheduling, Throwable error ) private void handleCompletion( BoltConnection connection, Boolean shouldContinueScheduling, Throwable error )
{ {
CompletableFuture<Boolean> previousFuture = activeWorkItems.remove( connection.id() ); CompletableFuture<Boolean> previousFuture = activeWorkItems.remove( connection.id() );


Expand All @@ -175,13 +186,25 @@ private void handleCompletion( BoltConnection connection, Object shouldContinueS
} }
else else
{ {
if ( (Boolean)shouldContinueScheduling && connection.hasPendingJobs() ) if ( shouldContinueScheduling && connection.hasPendingJobs() )
{ {
previousFuture.thenAcceptAsync( ignore -> handleSubmission( connection ), forkJoinPool ); handleSubmission( connection );
} }
} }
} }


private void stopConnection( BoltConnection connection )
{
try
{
connection.stop();
}
catch ( Throwable t )
{
log.warn( String.format( "An unexpected error occurred while stopping BoltConnection [%s]", connection.id() ), t );
}
}

private static class NameAppendingThreadFactory implements ThreadFactory private static class NameAppendingThreadFactory implements ThreadFactory
{ {
private final String nameToAppend; private final String nameToAppend;
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.scheduler.JobScheduler; import org.neo4j.scheduler.JobScheduler;


public class ExecutorBoltSchedulerProvider extends LifecycleAdapter implements BoltSchedulerProvider public class ExecutorBoltSchedulerProvider extends LifecycleAdapter implements BoltSchedulerProvider
Expand All @@ -35,6 +36,7 @@ public class ExecutorBoltSchedulerProvider extends LifecycleAdapter implements B
private final ExecutorFactory executorFactory; private final ExecutorFactory executorFactory;
private final JobScheduler scheduler; private final JobScheduler scheduler;
private final LogService logService; private final LogService logService;
private final Log internalLog;
private final ConcurrentHashMap<String, BoltScheduler> boltSchedulers; private final ConcurrentHashMap<String, BoltScheduler> boltSchedulers;


private ExecutorService forkJoinThreadPool; private ExecutorService forkJoinThreadPool;
Expand All @@ -45,11 +47,12 @@ public ExecutorBoltSchedulerProvider( Config config, ExecutorFactory executorFac
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
this.scheduler = scheduler; this.scheduler = scheduler;
this.logService = logService; this.logService = logService;
this.internalLog = logService.getInternalLog( getClass() );
this.boltSchedulers = new ConcurrentHashMap<>(); this.boltSchedulers = new ConcurrentHashMap<>();
} }


@Override @Override
public void start() throws Throwable public void start()
{ {
forkJoinThreadPool = new ForkJoinPool(); forkJoinThreadPool = new ForkJoinPool();
config.enabledBoltConnectors().forEach( connector -> config.enabledBoltConnectors().forEach( connector ->
Expand All @@ -64,15 +67,27 @@ public void start() throws Throwable
} }


@Override @Override
public void stop() throws Throwable public void stop()
{ {
boltSchedulers.values().forEach( s -> s.stop() ); boltSchedulers.values().forEach( this::stopScheduler );
boltSchedulers.clear(); boltSchedulers.clear();


forkJoinThreadPool.shutdown(); forkJoinThreadPool.shutdown();
forkJoinThreadPool = null; forkJoinThreadPool = null;
} }


private void stopScheduler( BoltScheduler scheduler )
{
try
{
scheduler.stop();
}
catch ( Throwable t )
{
internalLog.warn( String.format( "An unexpected error occurred while stopping BoltScheduler [%s]", scheduler.connector() ), t );
}
}

@Override @Override
public BoltScheduler get( BoltChannel channel ) public BoltScheduler get( BoltChannel channel )
{ {
Expand Down
Expand Up @@ -28,6 +28,4 @@ public interface ExecutorFactory


ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, ThreadFactory threadFactory ); ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, ThreadFactory threadFactory );


void destroy( ExecutorService executor );

} }
Expand Up @@ -150,13 +150,13 @@ public BoltResult start() throws KernelException
} }
catch ( KernelException e ) catch ( KernelException e )
{ {
transactionalContext.close( false ); close( false );
onFail.apply(); onFail.apply();
throw new QueryExecutionKernelException( e ); throw new QueryExecutionKernelException( e );
} }
catch ( Throwable e ) catch ( Throwable e )
{ {
transactionalContext.close( false ); close( false );
onFail.apply(); onFail.apply();
throw e; throw e;
} }
Expand Down

0 comments on commit ed2497e

Please sign in to comment.