Skip to content

Commit

Permalink
Add more clean-up logic and handle transactions on the same thread
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Mar 2, 2018
1 parent a8384d2 commit 6534a28
Show file tree
Hide file tree
Showing 23 changed files with 795 additions and 192 deletions.
Expand Up @@ -34,7 +34,6 @@
import org.neo4j.bolt.runtime.CachedThreadPoolExecutorFactory;
import org.neo4j.bolt.runtime.DefaultBoltConnectionFactory;
import org.neo4j.bolt.runtime.ExecutorBoltSchedulerProvider;
import org.neo4j.bolt.runtime.MetricsReportingBoltConnectionFactory;
import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.bolt.security.auth.BasicAuthentication;
import org.neo4j.bolt.transport.BoltProtocolHandlerFactory;
Expand Down Expand Up @@ -182,9 +181,8 @@ protected WorkerFactory createWorkerFactory( BoltFactory boltFactory, JobSchedul
private BoltConnectionFactory createConnectionFactory( BoltFactory boltFactory, BoltSchedulerProvider schedulerProvider,
Dependencies dependencies, LogService logService, Clock clock )
{
return new MetricsReportingBoltConnectionFactory( dependencies.monitors(),
new DefaultBoltConnectionFactory( boltFactory, schedulerProvider, logService, clock,
new BoltConnectionReadLimiter( logService.getInternalLog( BoltConnectionReadLimiter.class ) ) ), clock );
return new DefaultBoltConnectionFactory( boltFactory, schedulerProvider, logService, clock,
new BoltConnectionReadLimiter( logService.getInternalLog( BoltConnectionReadLimiter.class ) ), dependencies.monitors() );
}

private Map<BoltConnector,ProtocolInitializer> createConnectors( Config config, SslPolicyLoader sslPolicyFactory, LogService logService, Log log,
Expand Down
Expand Up @@ -89,6 +89,8 @@ public interface BoltConnection
*/
boolean processNextBatch();

void handleSchedulingError( Throwable t );

/**
* Interrupt and (possibly) stop the current running job, but continue processing next jobs
*/
Expand Down
Expand Up @@ -20,11 +20,13 @@
package org.neo4j.bolt.runtime;

import io.netty.channel.Channel;
import org.apache.commons.lang3.exception.ExceptionUtils;

import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.bolt.BoltChannel;
Expand All @@ -33,13 +35,17 @@
import org.neo4j.bolt.v1.runtime.BoltProtocolBreachFatality;
import org.neo4j.bolt.v1.runtime.BoltStateMachine;
import org.neo4j.bolt.v1.runtime.Job;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;
import org.neo4j.util.FeatureToggles;

import static java.util.concurrent.TimeUnit.SECONDS;

public class DefaultBoltConnection implements BoltConnection
{
private static final int DEFAULT_MAX_BATCH_SIZE = FeatureToggles.getInteger( BoltKernelExtension.class, "max_batch_size", 100 );
protected static final int DEFAULT_MAX_BATCH_SIZE = FeatureToggles.getInteger( BoltKernelExtension.class, "max_batch_size", 100 );

private final String id;

Expand Down Expand Up @@ -129,21 +135,53 @@ public void enqueue( Job job )

@Override
public boolean processNextBatch()
{
return processNextBatch( maxBatchSize );
}

private boolean processNextBatch( int batchCount )
{
try
{
if ( !queue.isEmpty() )
boolean waitForMessage = false;
boolean loop = false;
do
{
queue.drainTo( batch, maxBatchSize );
notifyDrained( batch );

for ( int i = 0; !shouldClose.get() && i < batch.size(); i++ )
if ( !shouldClose.get() )
{
Job current = batch.get( i );

current.perform( machine );
if ( waitForMessage || !queue.isEmpty() )
{
queue.drainTo( batch, batchCount );
if ( batch.size() == 0 )
{
while ( true )
{
Job nextJob = queue.poll( 10, SECONDS );
if ( nextJob != null )
{
batch.add( nextJob );

break;
}
}
}
notifyDrained( batch );

while ( batch.size() > 0 )
{
Job current = batch.remove( 0 );

current.perform( machine );
}

loop = machine.shouldStickOnThread();
waitForMessage = loop;
}
}
}
while ( loop );

assert !machine.hasOpenStatement();
}
catch ( BoltConnectionAuthFatality ex )
{
Expand All @@ -162,8 +200,6 @@ public boolean processNextBatch()
}
finally
{
batch.clear();

if ( shouldClose.get() )
{
close();
Expand All @@ -173,6 +209,24 @@ public boolean processNextBatch()
return !closed.get();
}

@Override
public void handleSchedulingError( Throwable t )
{
Neo4jError error;
if ( ExceptionUtils.hasCause( t, RejectedExecutionException.class ) )
{
error = Neo4jError.from( Status.Request.NoThreadsAvailable, Status.Request.NoThreadsAvailable.code().description() );
}
else
{
error = Neo4jError.fatalFrom( t );
}

userLog.error( String.format( "Unexpected error during scheduling of bolt session '%s'.", id() ), t );
machine.markFailed( error );
processNextBatch( 1 );
}

@Override
public void interrupt()
{
Expand All @@ -188,10 +242,7 @@ public void stop()

if ( !hasPendingJobs() )
{
enqueue( mach ->
{

} );
processNextBatch( 0 );
}
}
}
Expand Down Expand Up @@ -246,5 +297,4 @@ private void notifyDrained( List<Job> jobs )
queueMonitor.drained( this, jobs );
}
}

}
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.v1.runtime.BoltFactory;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.monitoring.Monitors;

public class DefaultBoltConnectionFactory implements BoltConnectionFactory
{
Expand All @@ -32,15 +33,19 @@ public class DefaultBoltConnectionFactory implements BoltConnectionFactory
private final LogService logService;
private final Clock clock;
private final BoltConnectionQueueMonitor queueMonitor;
private final Monitors monitors;
private final BoltConnectionMetricsMonitor metricsMonitor;

public DefaultBoltConnectionFactory( BoltFactory machineFactory, BoltSchedulerProvider schedulerProvider, LogService logService, Clock clock,
BoltConnectionQueueMonitor queueMonitor )
BoltConnectionQueueMonitor queueMonitor, Monitors monitors )
{
this.machineFactory = machineFactory;
this.schedulerProvider = schedulerProvider;
this.logService = logService;
this.clock = clock;
this.queueMonitor = queueMonitor;
this.monitors = monitors;
this.metricsMonitor = monitors.newMonitor( BoltConnectionMetricsMonitor.class );
}

@Override
Expand All @@ -49,12 +54,21 @@ public BoltConnection newConnection( BoltChannel channel )
BoltScheduler scheduler = schedulerProvider.get( channel );
BoltConnectionQueueMonitor connectionQueueMonitor =
queueMonitor == null ? scheduler : new BoltConnectionQueueMonitorAggregate( scheduler, queueMonitor );
BoltConnection connection =
new DefaultBoltConnection( channel, machineFactory.newMachine( channel, clock ), logService, scheduler, connectionQueueMonitor );

BoltConnection connection;
if ( monitors.hasListeners( BoltConnectionMetricsMonitor.class ) )
{
connection =
new MetricsReportingBoltConnection( channel, machineFactory.newMachine( channel, clock ), logService, scheduler, connectionQueueMonitor,
metricsMonitor, clock );
}
else
{
connection = new DefaultBoltConnection( channel, machineFactory.newMachine( channel, clock ), logService, scheduler, connectionQueueMonitor );
}

connection.start();

return connection;
}

}
Expand Up @@ -20,12 +20,14 @@
package org.neo4j.bolt.runtime;

import com.sun.org.apache.xpath.internal.operations.Bool;
import org.apache.commons.lang3.exception.ExceptionUtils;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

import org.neo4j.bolt.v1.runtime.Job;
import org.neo4j.kernel.impl.logging.LogService;
Expand All @@ -43,11 +45,12 @@ public class ExecutorBoltScheduler implements BoltScheduler, BoltConnectionLifet
private final int maxPoolSize;
private final Duration keepAlive;
private final int queueSize;
private final ExecutorService forkJoinPool;

private ExecutorService threadPool;

public ExecutorBoltScheduler( ExecutorFactory executorFactory, JobScheduler scheduler, LogService logService, int corePoolSize, int maxPoolSize,
Duration keepAlive, int queueSize )
Duration keepAlive, int queueSize, ExecutorService forkJoinPool )
{
this.executorFactory = executorFactory;
this.scheduler = scheduler;
Expand All @@ -56,6 +59,7 @@ public ExecutorBoltScheduler( ExecutorFactory executorFactory, JobScheduler sche
this.maxPoolSize = maxPoolSize;
this.keepAlive = keepAlive;
this.queueSize = queueSize;
this.forkJoinPool = forkJoinPool;
}

boolean isRegistered( BoltConnection connection )
Expand Down Expand Up @@ -117,8 +121,16 @@ public void drained( BoltConnection from, Collection<Job> batch )

private void handleSubmission( BoltConnection connection )
{
activeWorkItems.computeIfAbsent( connection.id(), key -> CompletableFuture.supplyAsync( connection::processNextBatch, threadPool ).whenCompleteAsync(
( result, error ) -> handleCompletion( connection, result, error ), threadPool ) );
try
{
activeWorkItems.computeIfAbsent( connection.id(),
key -> CompletableFuture.supplyAsync( connection::processNextBatch, threadPool ).whenCompleteAsync(
( result, error ) -> handleCompletion( connection, result, error ), forkJoinPool ) );
}
catch ( RejectedExecutionException ex )
{
connection.handleSchedulingError( ex );
}
}

private void handleCompletion( BoltConnection connection, Object shouldContinueScheduling, Throwable error )
Expand All @@ -127,14 +139,21 @@ private void handleCompletion( BoltConnection connection, Object shouldContinueS

if ( error != null )
{
log.error( String.format( "Unexpected error during job scheduling for session '%s'.", connection.id() ), error );
connection.stop();
if ( ExceptionUtils.hasCause( error, RejectedExecutionException.class ) )
{
connection.handleSchedulingError( error );
}
else
{
log.error( String.format( "Unexpected error during job scheduling for session '%s'.", connection.id() ), error );
connection.stop();
}
}
else
{
if ( (Boolean)shouldContinueScheduling && connection.hasPendingJobs() )
{
previousFuture.thenAcceptAsync( ignore -> handleSubmission( connection ), threadPool );
previousFuture.thenAcceptAsync( ignore -> handleSubmission( connection ), forkJoinPool );
}
}
}
Expand Down
Expand Up @@ -20,6 +20,8 @@
package org.neo4j.bolt.runtime;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;

import org.neo4j.bolt.BoltChannel;
import org.neo4j.kernel.configuration.Config;
Expand All @@ -35,6 +37,8 @@ public class ExecutorBoltSchedulerProvider extends LifecycleAdapter implements B
private final LogService logService;
private final ConcurrentHashMap<String, BoltScheduler> boltSchedulers;

private ExecutorService forkJoinThreadPool;

public ExecutorBoltSchedulerProvider( Config config, ExecutorFactory executorFactory, JobScheduler scheduler, LogService logService )
{
this.config = config;
Expand All @@ -47,12 +51,13 @@ public ExecutorBoltSchedulerProvider( Config config, ExecutorFactory executorFac
@Override
public void start() throws Throwable
{
forkJoinThreadPool = new ForkJoinPool();
config.enabledBoltConnectors().forEach( connector ->
{
BoltScheduler boltScheduler =
new ExecutorBoltScheduler( executorFactory, scheduler, logService, config.get( connector.thread_pool_core_size ),
config.get( connector.thread_pool_max_size ), config.get( connector.thread_pool_keep_alive ),
config.get( connector.thread_pool_queue_size ) );
config.get( connector.thread_pool_queue_size ), forkJoinThreadPool );
boltScheduler.start();
boltSchedulers.put( connector.key(), boltScheduler );
} );
Expand All @@ -63,6 +68,9 @@ public void stop() throws Throwable
{
boltSchedulers.values().forEach( s -> s.stop() );
boltSchedulers.clear();

forkJoinThreadPool.shutdown();
forkJoinThreadPool = null;
}

@Override
Expand Down

0 comments on commit 6534a28

Please sign in to comment.