Skip to content

Commit

Permalink
Fix some varable names due to renaming and removed functions used in …
Browse files Browse the repository at this point in the history
…constructer
  • Loading branch information
Zhen Li authored and lutovich committed Jul 4, 2018
1 parent 182f176 commit 0fc6c54
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 68 deletions.
4 changes: 0 additions & 4 deletions community/bolt/pom.xml
Expand Up @@ -104,10 +104,6 @@
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
Expand Down
14 changes: 7 additions & 7 deletions community/bolt/src/main/java/org/neo4j/bolt/BoltServer.java
Expand Up @@ -124,12 +124,12 @@ public void start() throws Throwable
createConnectionFactory( config, boltSchedulerProvider, throttleGroup, logService, clock );
BoltStateMachineFactory boltStateMachineFactory = createBoltFactory( authentication, clock );

BoltProtocolFactory boltProtocolInstaller = createBoltProtocolInstallerFactory( boltConnectionFactory, boltStateMachineFactory );
BoltProtocolFactory boltProtocolFactory = createBoltProtocolFactory( boltConnectionFactory, boltStateMachineFactory );

if ( !config.enabledBoltConnectors().isEmpty() && !config.get( GraphDatabaseSettings.disconnected ) )
{
NettyServer server = new NettyServer( jobScheduler.threadFactory( boltNetworkIO ),
createConnectors( boltProtocolInstaller, throttleGroup, boltLogging, log ), connectorPortRegister, userLog );
createConnectors( boltProtocolFactory, throttleGroup, boltLogging, log ), connectorPortRegister, userLog );
life.add( server );
log.info( "Bolt server loaded" );
}
Expand All @@ -152,15 +152,15 @@ private BoltConnectionFactory createConnectionFactory( Config config, BoltSchedu
config.get( GraphDatabaseSettings.bolt_inbound_message_throttle_high_water_mark ) ), monitors );
}

private Map<BoltConnector,ProtocolInitializer> createConnectors( BoltProtocolFactory handlerFactory,
private Map<BoltConnector,ProtocolInitializer> createConnectors( BoltProtocolFactory boltProtocolFactory,
TransportThrottleGroup throttleGroup, BoltMessageLogging boltLogging, Log log )
{
return config.enabledBoltConnectors()
.stream()
.collect( toMap( identity(), connector -> createProtocolInitializer( connector, handlerFactory, throttleGroup, boltLogging, log ) ) );
.collect( toMap( identity(), connector -> createProtocolInitializer( connector, boltProtocolFactory, throttleGroup, boltLogging, log ) ) );
}

private ProtocolInitializer createProtocolInitializer( BoltConnector connector, BoltProtocolFactory handlerFactory,
private ProtocolInitializer createProtocolInitializer( BoltConnector connector, BoltProtocolFactory boltProtocolFactory,
TransportThrottleGroup throttleGroup, BoltMessageLogging boltLogging, Log log )
{
SslContext sslCtx;
Expand Down Expand Up @@ -198,7 +198,7 @@ private ProtocolInitializer createProtocolInitializer( BoltConnector connector,

ListenSocketAddress listenAddress = config.get( connector.listen_address );
return new SocketTransport( connector.key(), listenAddress, sslCtx, requireEncryption, logService.getInternalLogProvider(), boltLogging,
throttleGroup, handlerFactory );
throttleGroup, boltProtocolFactory );
}

private static SslContext createSslContext( SslPolicyLoader sslPolicyFactory, Config config )
Expand All @@ -225,7 +225,7 @@ private Authentication createAuthentication()
dependencyResolver.resolveDependency( UserManagerSupplier.class ) );
}

private BoltProtocolFactory createBoltProtocolInstallerFactory( BoltConnectionFactory connectionFactory,
private BoltProtocolFactory createBoltProtocolFactory( BoltConnectionFactory connectionFactory,
BoltStateMachineFactory stateMachineFactory )
{
return new DefaultBoltProtocolFactory( connectionFactory, stateMachineFactory, logService );
Expand Down
Expand Up @@ -21,16 +21,14 @@

import java.io.IOException;

import org.neo4j.bolt.v1.packstream.PackInput;
import org.neo4j.bolt.v1.packstream.PackOutput;
import org.neo4j.values.AnyValue;
import org.neo4j.values.virtual.MapValue;

/**
* Represents a single Bolt message format by exposing a {@link Packer packer} and {@link Unpacker unpacker}
* for primitives of this format.
*/
public interface Neo4jPack
public interface Neo4jPack extends PackProvider, UnpackerProvider
{
interface Packer
{
Expand Down Expand Up @@ -61,10 +59,5 @@ interface Unpacker

long unpackListHeader() throws IOException;
}

Packer newPacker( PackOutput output );

Unpacker newUnpacker( PackInput input );

long version();
}
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.messaging;

import org.neo4j.bolt.v1.packstream.PackOutput;

public interface PackProvider
{
Neo4jPack.Packer newPacker( PackOutput output );
}
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.messaging;

import org.neo4j.bolt.v1.packstream.PackInput;

public interface UnpackerProvider
{
Neo4jPack.Unpacker newUnpacker( PackInput input );
}
Expand Up @@ -27,6 +27,7 @@ public interface BoltConnectionFactory
* Create a new connection bound to the specified channel
*
* @param channel the underlying channel
* @param boltStateMachine to handle state change of the connection
* @return the newly created connection instance
*/
BoltConnection newConnection( BoltChannel channel, BoltStateMachine boltStateMachine );
Expand Down
Expand Up @@ -25,7 +25,7 @@
/**
* Represents a component that instantiates Bolt protocol handlers.
*
* @see BoltProtocolPipelineInstaller
* @see BoltProtocol
*/
@FunctionalInterface
public interface BoltProtocolFactory
Expand Down
Expand Up @@ -40,12 +40,12 @@ public class SocketTransport implements NettyServer.ProtocolInitializer
private final LogProvider logging;
private final BoltMessageLogging boltLogging;
private final TransportThrottleGroup throttleGroup;
private final BoltProtocolFactory handlerFactory;
private final BoltProtocolFactory boltProtocolFactory;

public SocketTransport( String connector, ListenSocketAddress address, SslContext sslCtx, boolean encryptionRequired,
LogProvider logging, BoltMessageLogging boltLogging,
TransportThrottleGroup throttleGroup,
BoltProtocolFactory handlerFactory )
BoltProtocolFactory boltProtocolFactory )
{
this.connector = connector;
this.address = address;
Expand All @@ -54,7 +54,7 @@ public SocketTransport( String connector, ListenSocketAddress address, SslContex
this.logging = logging;
this.boltLogging = boltLogging;
this.throttleGroup = throttleGroup;
this.handlerFactory = handlerFactory;
this.boltProtocolFactory = boltProtocolFactory;
}

@Override
Expand All @@ -74,7 +74,7 @@ public void initChannel( SocketChannel ch )
ch.closeFuture().addListener( future -> throttleGroup.uninstall( ch ) );

TransportSelectionHandler transportSelectionHandler = new TransportSelectionHandler( connector, sslCtx,
encryptionRequired, false, logging, handlerFactory, boltLogging );
encryptionRequired, false, logging, boltProtocolFactory, boltLogging );

ch.pipeline().addLast( transportSelectionHandler );
}
Expand Down
Expand Up @@ -52,18 +52,18 @@ public class TransportSelectionHandler extends ByteToMessageDecoder
private final boolean isEncrypted;
private final LogProvider logging;
private final BoltMessageLogging boltLogging;
private final BoltProtocolFactory handlerFactory;
private final BoltProtocolFactory boltProtocolFactory;

TransportSelectionHandler( String connector, SslContext sslCtx, boolean encryptionRequired, boolean isEncrypted, LogProvider logging,
BoltProtocolFactory handlerFactory, BoltMessageLogging boltLogging )
BoltProtocolFactory boltProtocolFactory, BoltMessageLogging boltLogging )
{
this.connector = connector;
this.sslCtx = sslCtx;
this.encryptionRequired = encryptionRequired;
this.isEncrypted = isEncrypted;
this.logging = logging;
this.boltLogging = boltLogging;
this.handlerFactory = handlerFactory;
this.boltProtocolFactory = boltProtocolFactory;
}

@Override
Expand Down Expand Up @@ -121,7 +121,7 @@ private void enableSsl( ChannelHandlerContext ctx )
{
ChannelPipeline p = ctx.pipeline();
p.addLast( sslCtx.newHandler( ctx.alloc() ) );
p.addLast( new TransportSelectionHandler( connector, null, encryptionRequired, true, logging, handlerFactory, boltLogging ) );
p.addLast( new TransportSelectionHandler( connector, null, encryptionRequired, true, logging, boltProtocolFactory, boltLogging ) );
p.remove( this );
}

Expand All @@ -147,7 +147,7 @@ private void switchToWebsocket( ChannelHandlerContext ctx )

private ProtocolHandshaker newHandshaker( ChannelHandlerContext ctx )
{
return new ProtocolHandshaker( handlerFactory, BoltChannel.open( connector, ctx.channel(), boltLogging.newLogger( ctx.channel() ) ), logging,
return new ProtocolHandshaker( boltProtocolFactory, BoltChannel.open( connector, ctx.channel(), boltLogging.newLogger( ctx.channel() ) ), logging,
encryptionRequired, isEncrypted );
}
}
Expand Up @@ -23,12 +23,10 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.function.Function;

import org.neo4j.bolt.messaging.BoltRequestMessageReader;
import org.neo4j.bolt.messaging.Neo4jPack;
import org.neo4j.bolt.messaging.UnpackerProvider;
import org.neo4j.bolt.v1.packstream.ByteBufInput;
import org.neo4j.bolt.v1.packstream.PackInput;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;

Expand All @@ -41,10 +39,10 @@ public class MessageDecoder extends SimpleChannelInboundHandler<ByteBuf>
private final BoltRequestMessageReader reader;
private final Log log;

public MessageDecoder( Function<PackInput, Neo4jPack.Unpacker> unpackProvider, BoltRequestMessageReader reader, LogService logService )
public MessageDecoder( UnpackerProvider unpackProvider, BoltRequestMessageReader reader, LogService logService )
{
this.input = new ByteBufInput();
this.unpacker = unpackProvider.apply( input );
this.unpacker = unpackProvider.newUnpacker( input );
this.reader = reader;
this.log = logService.getInternalLog( getClass() );
}
Expand Down
Expand Up @@ -74,7 +74,7 @@ public void install()

pipeline.addLast( new ChunkDecoder() );
pipeline.addLast( new MessageAccumulator() );
pipeline.addLast( new MessageDecoder( neo4jPack::newUnpacker, messageReader, logging ) );
pipeline.addLast( new MessageDecoder( neo4jPack, messageReader, logging ) );
pipeline.addLast( new HouseKeeper( connection, logging ) );
}

Expand All @@ -91,7 +91,7 @@ public long version()

public static BoltRequestMessageReader createBoltMessageReaderV1( BoltChannel channel, Neo4jPack neo4jPack, BoltConnection connection, LogService logging )
{
BoltResponseMessageWriter responseWriter = new BoltResponseMessageWriter( neo4jPack::newPacker, connection.output(), logging, channel.log() );
BoltResponseMessageWriter responseWriter = new BoltResponseMessageWriter( neo4jPack, connection.output(), logging, channel.log() );
return new BoltRequestMessageReaderV1( connection, responseWriter, channel.log(), logging );
}
}
Expand Up @@ -20,10 +20,10 @@
package org.neo4j.bolt.v1.messaging;

import java.io.IOException;
import java.util.function.Function;

import org.neo4j.bolt.logging.BoltMessageLogger;
import org.neo4j.bolt.messaging.Neo4jPack;
import org.neo4j.bolt.messaging.PackProvider;
import org.neo4j.bolt.v1.packstream.PackOutput;
import org.neo4j.cypher.result.QueryResult;
import org.neo4j.function.ThrowingAction;
Expand All @@ -48,11 +48,11 @@ public class BoltResponseMessageWriter implements BoltResponseMessageHandler
private final BoltMessageLogger messageLogger;
private final Log log;

public BoltResponseMessageWriter( Function<PackOutput,Neo4jPack.Packer> packerProvider, PackOutput output, LogService logService,
public BoltResponseMessageWriter( PackProvider packerProvider, PackOutput output, LogService logService,
BoltMessageLogger messageLogger )
{
this.output = output;
this.packer = packerProvider.apply( output );
this.packer = packerProvider.newPacker( output );
this.messageLogger = messageLogger;
this.log = logService.getInternalLog( getClass() );
}
Expand Down
Expand Up @@ -30,6 +30,7 @@ public interface BoltStateMachineFactory
/**
* Generate a new state machine.
*
* @param protocolVersion used to select state machine version
* @param boltChannel channel over which Bolt massages can be exchanged
* @return new {@link BoltStateMachine} instance
*/
Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.runtime.BoltStateMachine;
import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.bolt.v1.BoltProtocolV1;
import org.neo4j.bolt.v2.BoltProtocolV2;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.GraphDatabaseQueryService;
Expand All @@ -47,8 +49,8 @@ public class BoltStateMachineFactoryImplTest
private static final Clock CLOCK = Clock.systemUTC();
private static final BoltChannel CHANNEL = mock( BoltChannel.class );

@ParameterizedTest
@ValueSource( longs = {1, 2} )
@ParameterizedTest( name = "V{0}" )
@ValueSource( longs = {BoltProtocolV1.VERSION, BoltProtocolV2.VERSION} )
public void shouldCreateBoltStateMachines( long protocolVersion )
{
BoltStateMachineFactoryImpl factory = newBoltFactory();
Expand Down
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.bolt.v1.transport.socket;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.After;
import org.junit.Test;
Expand All @@ -34,15 +33,12 @@
import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.BoltProtocol;
import org.neo4j.bolt.logging.NullBoltMessageLogger;
import org.neo4j.bolt.messaging.BoltRequestMessageReader;
import org.neo4j.bolt.messaging.Neo4jPack;
import org.neo4j.bolt.messaging.RequestMessage;
import org.neo4j.bolt.runtime.BoltResponseHandler;
import org.neo4j.bolt.runtime.BoltStateMachine;
import org.neo4j.bolt.runtime.SynchronousBoltConnection;
import org.neo4j.bolt.transport.pipeline.ChunkDecoder;
import org.neo4j.bolt.transport.pipeline.MessageAccumulator;
import org.neo4j.bolt.transport.pipeline.MessageDecoder;
import org.neo4j.bolt.v1.BoltProtocolV1;
import org.neo4j.bolt.v1.messaging.BoltRequestMessageWriter;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.messaging.RecordingByteChannel;
Expand All @@ -60,7 +56,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.bolt.v1.BoltProtocolV1.createBoltMessageReaderV1;
import static org.neo4j.values.virtual.VirtualValues.EMPTY_MAP;

/**
Expand Down Expand Up @@ -144,27 +139,7 @@ private void testPermutation( byte[] unfragmented, ByteBuf[] fragments ) throws
BoltStateMachine machine = mock( BoltStateMachine.class );
SynchronousBoltConnection boltConnection = new SynchronousBoltConnection( machine );
NullLogService logging = NullLogService.getInstance();
BoltRequestMessageReader messageReader = createBoltMessageReaderV1( boltChannel, neo4jPack, boltConnection, logging );
BoltProtocol boltProtocol = new BoltProtocol()
{
@Override
public void install()
{
ChannelPipeline pipeline = boltChannel.rawChannel().pipeline();

pipeline.addLast( new ChunkDecoder() );
pipeline.addLast( new MessageAccumulator() );

pipeline.addLast( new MessageDecoder( neo4jPack::newUnpacker, messageReader, logging ) );
}

@Override
public long version()
{
return -1;
}
};

BoltProtocol boltProtocol = new BoltProtocolV1( boltChannel, ( ch, s ) -> boltConnection, ( v, ch ) -> machine, logging );
boltProtocol.install();

// When data arrives split up according to the current permutation
Expand Down

0 comments on commit 0fc6c54

Please sign in to comment.