Skip to content

Commit

Permalink
Moved all bolt versioned components into BoltProtocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhen Li authored and lutovich committed Jul 4, 2018
1 parent 4777a31 commit 6df278c
Show file tree
Hide file tree
Showing 19 changed files with 349 additions and 150 deletions.
4 changes: 4 additions & 0 deletions community/bolt/pom.xml
Expand Up @@ -108,6 +108,10 @@
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
Expand Down
38 changes: 38 additions & 0 deletions community/bolt/src/main/java/org/neo4j/bolt/BoltProtocol.java
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt;

import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.BoltStateMachine;
import org.neo4j.bolt.v1.messaging.BoltMessageRouter;
import org.neo4j.bolt.v1.messaging.Neo4jPack;

public interface BoltProtocol
{
Neo4jPack neo4jPack();

BoltMessageRouter messageRouter();

BoltStateMachine stateMachine();

BoltConnection connection();

long version();
}
25 changes: 13 additions & 12 deletions community/bolt/src/main/java/org/neo4j/bolt/BoltServer.java
Expand Up @@ -41,8 +41,8 @@
import org.neo4j.bolt.transport.NettyServer.ProtocolInitializer;
import org.neo4j.bolt.transport.SocketTransport;
import org.neo4j.bolt.transport.TransportThrottleGroup;
import org.neo4j.bolt.v1.runtime.BoltFactory;
import org.neo4j.bolt.v1.runtime.BoltFactoryImpl;
import org.neo4j.bolt.v1.runtime.BoltStateMachineFactory;
import org.neo4j.bolt.v1.runtime.BoltStateMachineFactoryImpl;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.ListenSocketAddress;
Expand Down Expand Up @@ -118,18 +118,18 @@ public void start() throws Throwable

TransportThrottleGroup throttleGroup = new TransportThrottleGroup( config, clock );

BoltFactory boltFactory = createBoltFactory( authentication );
BoltSchedulerProvider boltSchedulerProvider =
life.add( new ExecutorBoltSchedulerProvider( config, new CachedThreadPoolExecutorFactory( log ), jobScheduler, logService ) );
BoltConnectionFactory boltConnectionFactory =
createConnectionFactory( config, boltFactory, boltSchedulerProvider, throttleGroup, logService, clock );
createConnectionFactory( config, boltSchedulerProvider, throttleGroup, logService, clock );
BoltStateMachineFactory boltStateMachineFactory = createBoltFactory( authentication, clock );

BoltProtocolPipelineInstallerFactory handlerFactory = createHandlerFactory( boltConnectionFactory, throttleGroup );
BoltProtocolPipelineInstallerFactory boltProtocolInstaller = createBoltProtocolInstallerFactory( boltConnectionFactory, boltStateMachineFactory );

if ( !config.enabledBoltConnectors().isEmpty() && !config.get( GraphDatabaseSettings.disconnected ) )
{
NettyServer server = new NettyServer( jobScheduler.threadFactory( boltNetworkIO ),
createConnectors( handlerFactory, throttleGroup, boltLogging, log ), connectorPortRegister, userLog );
createConnectors( boltProtocolInstaller, throttleGroup, boltLogging, log ), connectorPortRegister, userLog );
life.add( server );
log.info( "Bolt server loaded" );
}
Expand All @@ -143,10 +143,10 @@ public void stop() throws Throwable
life.shutdown(); // stop and shutdown the nested lifecycle
}

private BoltConnectionFactory createConnectionFactory( Config config, BoltFactory boltFactory, BoltSchedulerProvider schedulerProvider,
private BoltConnectionFactory createConnectionFactory( Config config, BoltSchedulerProvider schedulerProvider,
TransportThrottleGroup throttleGroup, LogService logService, Clock clock )
{
return new DefaultBoltConnectionFactory( boltFactory, schedulerProvider, throttleGroup, logService, clock,
return new DefaultBoltConnectionFactory( schedulerProvider, throttleGroup, logService, clock,
new BoltConnectionReadLimiter( logService.getInternalLog( BoltConnectionReadLimiter.class ),
config.get( GraphDatabaseSettings.bolt_inbound_message_throttle_low_water_mark ),
config.get( GraphDatabaseSettings.bolt_inbound_message_throttle_high_water_mark ) ), monitors );
Expand Down Expand Up @@ -225,14 +225,15 @@ private Authentication createAuthentication()
dependencyResolver.resolveDependency( UserManagerSupplier.class ) );
}

private BoltProtocolPipelineInstallerFactory createHandlerFactory( BoltConnectionFactory connectionFactory, TransportThrottleGroup throttleGroup )
private BoltProtocolPipelineInstallerFactory createBoltProtocolInstallerFactory( BoltConnectionFactory connectionFactory,
BoltStateMachineFactory stateMachineFactory )
{
return new DefaultBoltProtocolPipelineInstallerFactory( connectionFactory, throttleGroup, logService );
return new DefaultBoltProtocolPipelineInstallerFactory( connectionFactory, stateMachineFactory, logService );
}

private BoltFactory createBoltFactory( Authentication authentication )
private BoltStateMachineFactory createBoltFactory( Authentication authentication, Clock clock )
{
BoltConnectionTracker connectionTracker = dependencyResolver.resolveDependency( BoltConnectionTracker.class );
return new BoltFactoryImpl( db, usageData, availabilityGuard, authentication, connectionTracker, config, logService );
return new BoltStateMachineFactoryImpl( db, usageData, availabilityGuard, authentication, connectionTracker, clock, config, logService );
}
}
Expand Up @@ -29,6 +29,6 @@ public interface BoltConnectionFactory
* @param channel the underlying channel
* @return the newly created connection instance
*/
BoltConnection newConnection( BoltChannel channel );
BoltConnection newConnection( BoltChannel channel, BoltStateMachine boltStateMachine );

}
Expand Up @@ -23,14 +23,12 @@

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.transport.TransportThrottleGroup;
import org.neo4j.bolt.v1.runtime.BoltFactory;
import org.neo4j.bolt.v1.transport.ChunkedOutput;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.monitoring.Monitors;

public class DefaultBoltConnectionFactory implements BoltConnectionFactory
{
private final BoltFactory machineFactory;
private final BoltSchedulerProvider schedulerProvider;
private final TransportThrottleGroup throttleGroup;
private final LogService logService;
Expand All @@ -39,11 +37,9 @@ public class DefaultBoltConnectionFactory implements BoltConnectionFactory
private final Monitors monitors;
private final BoltConnectionMetricsMonitor metricsMonitor;

public DefaultBoltConnectionFactory( BoltFactory machineFactory, BoltSchedulerProvider schedulerProvider, TransportThrottleGroup throttleGroup,
LogService logService, Clock clock,
BoltConnectionQueueMonitor queueMonitor, Monitors monitors )
public DefaultBoltConnectionFactory( BoltSchedulerProvider schedulerProvider, TransportThrottleGroup throttleGroup,
LogService logService, Clock clock, BoltConnectionQueueMonitor queueMonitor, Monitors monitors )
{
this.machineFactory = machineFactory;
this.schedulerProvider = schedulerProvider;
this.throttleGroup = throttleGroup;
this.logService = logService;
Expand All @@ -54,7 +50,7 @@ public DefaultBoltConnectionFactory( BoltFactory machineFactory, BoltSchedulerPr
}

@Override
public BoltConnection newConnection( BoltChannel channel )
public BoltConnection newConnection( BoltChannel channel, BoltStateMachine stateMachine )
{
BoltScheduler scheduler = schedulerProvider.get( channel );
BoltConnectionQueueMonitor connectionQueueMonitor =
Expand All @@ -64,13 +60,12 @@ public BoltConnection newConnection( BoltChannel channel )
BoltConnection connection;
if ( monitors.hasListeners( BoltConnectionMetricsMonitor.class ) )
{
connection = new MetricsReportingBoltConnection( channel, chunkedOutput, machineFactory.newMachine( channel, clock ), logService, scheduler,
connectionQueueMonitor,
metricsMonitor, clock );
connection = new MetricsReportingBoltConnection( channel, chunkedOutput, stateMachine, logService, scheduler,
connectionQueueMonitor, metricsMonitor, clock );
}
else
{
connection = new DefaultBoltConnection( channel, chunkedOutput, machineFactory.newMachine( channel, clock ), logService, scheduler,
connection = new DefaultBoltConnection( channel, chunkedOutput, stateMachine, logService, scheduler,
connectionQueueMonitor );
}

Expand Down
Expand Up @@ -23,8 +23,7 @@

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.messaging.BoltRequestMessageReader;
import org.neo4j.bolt.messaging.Neo4jPack;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.BoltProtocol;
import org.neo4j.bolt.transport.pipeline.ChunkDecoder;
import org.neo4j.bolt.transport.pipeline.HouseKeeper;
import org.neo4j.bolt.transport.pipeline.MessageAccumulator;
Expand All @@ -42,16 +41,14 @@
public class DefaultBoltProtocolPipelineInstaller implements BoltProtocolPipelineInstaller
{
private final BoltChannel boltChannel;
private final Neo4jPack neo4jPack;
private final LogService logging;
private final BoltProtocol boltProtocol;

private final BoltConnection connection;

public DefaultBoltProtocolPipelineInstaller( BoltChannel boltChannel, BoltConnection connection, Neo4jPack neo4jPack, LogService logging )
public DefaultBoltProtocolPipelineInstaller( BoltChannel boltChannel, BoltProtocol boltProtocol, LogService logging )
{
this.boltChannel = boltChannel;
this.connection = connection;
this.neo4jPack = neo4jPack;
this.boltProtocol = boltProtocol;
this.logging = logging;
}

Expand All @@ -62,14 +59,14 @@ public void install()

pipeline.addLast( new ChunkDecoder() );
pipeline.addLast( new MessageAccumulator() );
pipeline.addLast( new MessageDecoder( neo4jPack, newRequestMessageReader(), logging ) );
pipeline.addLast( new HouseKeeper( connection, logging ) );
pipeline.addLast( new MessageDecoder( boltProtocol.neo4jPack(), boltProtocol.messageRouter(), logging ) );
pipeline.addLast( new HouseKeeper( boltProtocol.connection(), logging ) );
}

@Override
public long version()
{
return neo4jPack.version();
return boltProtocol.version();
}

private BoltRequestMessageReader newRequestMessageReader()
Expand Down
Expand Up @@ -23,48 +23,42 @@
import org.neo4j.bolt.messaging.Neo4jPack;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.BoltConnectionFactory;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v2.messaging.Neo4jPackV2;
import org.neo4j.bolt.BoltProtocol;
import org.neo4j.bolt.v1.BoltProtocolV1;
import org.neo4j.bolt.v1.runtime.BoltStateMachineFactory;
import org.neo4j.bolt.v2.BoltProtocolV2;
import org.neo4j.kernel.impl.logging.LogService;

public class DefaultBoltProtocolPipelineInstallerFactory implements BoltProtocolPipelineInstallerFactory
{
private final BoltConnectionFactory connectionFactory;
private final TransportThrottleGroup throttleGroup;
private final LogService logService;
private final BoltStateMachineFactory stateMachineFactory;

public DefaultBoltProtocolPipelineInstallerFactory( BoltConnectionFactory connectionFactory, TransportThrottleGroup throttleGroup,
public DefaultBoltProtocolPipelineInstallerFactory( BoltConnectionFactory connectionFactory, BoltStateMachineFactory stateMachineFactory,
LogService logService )
{
this.connectionFactory = connectionFactory;
this.throttleGroup = throttleGroup;
this.stateMachineFactory = stateMachineFactory;
this.logService = logService;
}

@Override
public BoltProtocolPipelineInstaller create( long protocolVersion, BoltChannel channel )
{
if ( protocolVersion == Neo4jPackV1.VERSION )
BoltProtocol boltProtocol;
if ( protocolVersion == BoltProtocolV1.VERSION )
{
return newProtocolPipelineInstaller( channel, new Neo4jPackV1() );
boltProtocol = new BoltProtocolV1( channel, connectionFactory, stateMachineFactory, logService );
}
else if ( protocolVersion == Neo4jPackV2.VERSION )
else if ( protocolVersion == BoltProtocolV2.VERSION )
{
return newProtocolPipelineInstaller( channel, new Neo4jPackV2() );
boltProtocol = new BoltProtocolV2( channel, connectionFactory, stateMachineFactory, logService );
}
else
{
return null;
}
}

private BoltProtocolPipelineInstaller newProtocolPipelineInstaller( BoltChannel channel, Neo4jPack neo4jPack )
{
return new DefaultBoltProtocolPipelineInstaller( channel, newBoltConnection( channel ), neo4jPack, logService );
}

private BoltConnection newBoltConnection( BoltChannel channel )
{
return connectionFactory.newConnection( channel );
return new DefaultBoltProtocolPipelineInstaller( channel, boltProtocol, logService );
}
}
Expand Up @@ -38,18 +38,18 @@ public class ProtocolHandshaker extends ChannelInboundHandlerAdapter
private static final int HANDSHAKE_BUFFER_SIZE = 5 * Integer.BYTES;

private final BoltChannel boltChannel;
private final BoltProtocolPipelineInstallerFactory handlerFactory;
private final BoltProtocolPipelineInstallerFactory boltProtocolInstallerFactory;
private final Log log;
private final boolean encryptionRequired;
private final boolean encrypted;

private ByteBuf handshakeBuffer;
private BoltProtocolPipelineInstaller protocol;

public ProtocolHandshaker( BoltProtocolPipelineInstallerFactory handlerFactory, BoltChannel boltChannel, LogProvider logging, boolean encryptionRequired,
boolean encrypted )
public ProtocolHandshaker( BoltProtocolPipelineInstallerFactory boltProtocolInstallerFactory, BoltChannel boltChannel, LogProvider logging,
boolean encryptionRequired, boolean encrypted )
{
this.handlerFactory = handlerFactory;
this.boltProtocolInstallerFactory = boltProtocolInstallerFactory;
this.boltChannel = boltChannel;
this.log = logging.getLog( getClass() );
this.encryptionRequired = encryptionRequired;
Expand Down Expand Up @@ -168,7 +168,7 @@ private boolean performHandshake()
{
final long suggestion = handshakeBuffer.getInt( (i + 1) * Integer.BYTES ) & 0xFFFFFFFFL;

protocol = handlerFactory.create( suggestion, boltChannel );
protocol = boltProtocolInstallerFactory.create( suggestion, boltChannel );
if ( protocol != null )
{
boltChannel.log().serverEvent( "HANDSHAKE", () -> format( "0x%02X", suggestion ) );
Expand Down

0 comments on commit 6df278c

Please sign in to comment.