Skip to content

Commit

Permalink
Allow easier Bolt message versioning
Browse files Browse the repository at this point in the history
Previously, decoding of inbound Bolt messages and their routing to the
state machine was very protocol V1 specific and not extensible. This
commit makes it possible to register protocol version specific message
deserializers. It also makes code always use same inbound message
classes. Two different implementations existed before - one for
production code and one for tests.

Central component is `BoltRequestMessageReader` which holds a
configurable set of message deserializers. Correct deserializer
is looked up by a message signature.
  • Loading branch information
lutovich committed Jul 3, 2018
1 parent cf81ffc commit e67c4d6
Show file tree
Hide file tree
Showing 108 changed files with 2,063 additions and 1,626 deletions.
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.bolt.v1.messaging; package org.neo4j.bolt.messaging;


import java.io.IOException; import java.io.IOException;


Expand Down
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.messaging;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.neo4j.bolt.logging.BoltMessageLogger;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.BoltResponseHandler;
import org.neo4j.bolt.runtime.Neo4jError;
import org.neo4j.bolt.v1.packstream.PackStream;
import org.neo4j.kernel.api.exceptions.Status;

import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;

/**
* Reader for Bolt request messages made available via a {@link Neo4jPack.Unpacker}.
*/
public abstract class BoltRequestMessageReader
{
private final BoltConnection connection;
private final BoltResponseHandler externalErrorResponseHandler;
private final BoltMessageLogger messageLogger;
private final Map<Integer,RequestMessageDecoder> decoders;

protected BoltRequestMessageReader( BoltConnection connection, BoltResponseHandler externalErrorResponseHandler,
List<RequestMessageDecoder> decoders, BoltMessageLogger messageLogger )
{
this.connection = connection;
this.externalErrorResponseHandler = externalErrorResponseHandler;
this.messageLogger = messageLogger;
this.decoders = decoders.stream().collect( toMap( RequestMessageDecoder::signature, identity() ) );
}

public void read( Neo4jPack.Unpacker unpacker ) throws IOException
{
try
{
doRead( unpacker );
}
catch ( BoltIOException e )
{
if ( e.causesFailureMessage() )
{
Neo4jError error = Neo4jError.from( e );
messageLogger.clientEvent( "ERROR", error::message );
connection.enqueue( stateMachine -> stateMachine.handleExternalFailure( error, externalErrorResponseHandler ) );
}
else
{
throw e;
}
}
}

private void doRead( Neo4jPack.Unpacker unpacker ) throws IOException
{
try
{
unpacker.unpackStructHeader();
int signature = unpacker.unpackStructSignature();

RequestMessageDecoder decoder = decoders.get( signature );
if ( decoder == null )
{
throw new BoltIOException( Status.Request.InvalidFormat,
String.format( "Message 0x%s is not a valid message signature.", Integer.toHexString( signature ) ) );
}

RequestMessage message = decoder.decode( unpacker );
BoltResponseHandler resonseHandler = decoder.resonseHandler();

connection.enqueue( stateMachine -> stateMachine.process( message, resonseHandler ) );
}
catch ( PackStream.PackStreamException e )
{
throw new BoltIOException( Status.Request.InvalidFormat,
String.format( "Unable to read message type. Error was: %s.", e.getMessage() ), e );
}
}
}
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.bolt.v1.messaging; package org.neo4j.bolt.messaging;


import java.io.IOException; import java.io.IOException;


Expand Down
Expand Up @@ -17,11 +17,9 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.bolt.v1.messaging.message; package org.neo4j.bolt.messaging;

import org.neo4j.bolt.v1.messaging.BoltRequestMessageHandler;


public interface RequestMessage public interface RequestMessage
{ {
void dispatch( BoltRequestMessageHandler consumer ); boolean safeToProcessInAnyState();
} }
Expand Up @@ -17,18 +17,17 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.bolt.v1.messaging; package org.neo4j.bolt.messaging;


import java.util.ArrayList; import java.io.IOException;
import java.util.List;


public abstract class MessageRecorder<T> import org.neo4j.bolt.runtime.BoltResponseHandler;

public interface RequestMessageDecoder
{ {
protected List<T> messages = new ArrayList<>(); int signature();


public List<T> asList() BoltResponseHandler resonseHandler();
{
return messages;
}


RequestMessage decode( Neo4jPack.Unpacker unpacker ) throws IOException;
} }
Expand Up @@ -19,12 +19,13 @@
*/ */
package org.neo4j.bolt.runtime; package org.neo4j.bolt.runtime;


import org.neo4j.bolt.messaging.RequestMessage;
import org.neo4j.internal.kernel.api.exceptions.KernelException; import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.bolt.ManagedBoltStateMachine; import org.neo4j.kernel.api.bolt.ManagedBoltStateMachine;


public interface BoltStateMachine extends ManagedBoltStateMachine, AutoCloseable public interface BoltStateMachine extends ManagedBoltStateMachine, AutoCloseable
{ {
void process( StateMachineMessage message, BoltResponseHandler handler ) throws BoltConnectionFatality; void process( RequestMessage message, BoltResponseHandler handler ) throws BoltConnectionFatality;


boolean shouldStickOnThread(); boolean shouldStickOnThread();


Expand Down
Expand Up @@ -19,9 +19,11 @@
*/ */
package org.neo4j.bolt.runtime; package org.neo4j.bolt.runtime;


import org.neo4j.bolt.messaging.RequestMessage;

public interface BoltStateMachineState public interface BoltStateMachineState
{ {
BoltStateMachineState process( StateMachineMessage message, StateMachineContext context ) throws BoltConnectionFatality; BoltStateMachineState process( RequestMessage message, StateMachineContext context ) throws BoltConnectionFatality;


String name(); String name();
} }

This file was deleted.

Expand Up @@ -22,15 +22,15 @@
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;


import org.neo4j.bolt.BoltChannel; import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.messaging.BoltRequestMessageReader;
import org.neo4j.bolt.messaging.Neo4jPack;
import org.neo4j.bolt.runtime.BoltConnection; import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.transport.pipeline.ChunkDecoder; import org.neo4j.bolt.transport.pipeline.ChunkDecoder;
import org.neo4j.bolt.transport.pipeline.HouseKeeper; import org.neo4j.bolt.transport.pipeline.HouseKeeper;
import org.neo4j.bolt.transport.pipeline.MessageAccumulator; import org.neo4j.bolt.transport.pipeline.MessageAccumulator;
import org.neo4j.bolt.transport.pipeline.MessageDecoder; import org.neo4j.bolt.transport.pipeline.MessageDecoder;
import org.neo4j.bolt.v1.messaging.BoltMessageRouter; import org.neo4j.bolt.v1.messaging.BoltRequestMessageReaderV1;
import org.neo4j.bolt.v1.messaging.BoltRequestMessageHandler;
import org.neo4j.bolt.v1.messaging.BoltResponseMessageWriter; import org.neo4j.bolt.v1.messaging.BoltResponseMessageWriter;
import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;


/** /**
Expand All @@ -43,30 +43,26 @@ public class DefaultBoltProtocolPipelineInstaller implements BoltProtocolPipelin
{ {
private final BoltChannel boltChannel; private final BoltChannel boltChannel;
private final Neo4jPack neo4jPack; private final Neo4jPack neo4jPack;
private final BoltResponseMessageWriter responseWriter;
private final BoltRequestMessageHandler messageHandler;
private final LogService logging; private final LogService logging;


private final BoltConnection connection; private final BoltConnection connection;


public DefaultBoltProtocolPipelineInstaller( BoltChannel boltChannel, BoltConnection connection, Neo4jPack neo4jPack, TransportThrottleGroup throttleGroup, public DefaultBoltProtocolPipelineInstaller( BoltChannel boltChannel, BoltConnection connection, Neo4jPack neo4jPack, LogService logging )
LogService logging )
{ {
this.boltChannel = boltChannel; this.boltChannel = boltChannel;
this.connection = connection; this.connection = connection;
this.neo4jPack = neo4jPack; this.neo4jPack = neo4jPack;
this.responseWriter = new BoltResponseMessageWriter( neo4jPack, connection.output(), logging, boltChannel.log() );
this.messageHandler = new BoltMessageRouter( logging.getInternalLog( getClass() ), boltChannel.log(), connection, responseWriter );
this.logging = logging; this.logging = logging;
} }


@Override
public void install() public void install()
{ {
ChannelPipeline pipeline = boltChannel.rawChannel().pipeline(); ChannelPipeline pipeline = boltChannel.rawChannel().pipeline();


pipeline.addLast( new ChunkDecoder() ); pipeline.addLast( new ChunkDecoder() );
pipeline.addLast( new MessageAccumulator() ); pipeline.addLast( new MessageAccumulator() );
pipeline.addLast( new MessageDecoder( neo4jPack, messageHandler, logging ) ); pipeline.addLast( new MessageDecoder( neo4jPack, newRequestMessageReader(), logging ) );
pipeline.addLast( new HouseKeeper( connection, logging ) ); pipeline.addLast( new HouseKeeper( connection, logging ) );
} }


Expand All @@ -75,4 +71,10 @@ public long version()
{ {
return neo4jPack.version(); return neo4jPack.version();
} }

private BoltRequestMessageReader newRequestMessageReader()
{
BoltResponseMessageWriter responseMessageWriter = new BoltResponseMessageWriter( neo4jPack, connection.output(), logging, boltChannel.log() );
return new BoltRequestMessageReaderV1( connection, responseMessageWriter, boltChannel.log(), logging );
}
} }
Expand Up @@ -20,9 +20,9 @@
package org.neo4j.bolt.transport; package org.neo4j.bolt.transport;


import org.neo4j.bolt.BoltChannel; import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.messaging.Neo4jPack;
import org.neo4j.bolt.runtime.BoltConnection; import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.BoltConnectionFactory; import org.neo4j.bolt.runtime.BoltConnectionFactory;
import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v2.messaging.Neo4jPackV2; import org.neo4j.bolt.v2.messaging.Neo4jPackV2;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
Expand Down Expand Up @@ -60,7 +60,7 @@ else if ( protocolVersion == Neo4jPackV2.VERSION )


private BoltProtocolPipelineInstaller newProtocolPipelineInstaller( BoltChannel channel, Neo4jPack neo4jPack ) private BoltProtocolPipelineInstaller newProtocolPipelineInstaller( BoltChannel channel, Neo4jPack neo4jPack )
{ {
return new DefaultBoltProtocolPipelineInstaller( channel, newBoltConnection( channel ), neo4jPack, throttleGroup, logService ); return new DefaultBoltProtocolPipelineInstaller( channel, newBoltConnection( channel ), neo4jPack, logService );
} }


private BoltConnection newBoltConnection( BoltChannel channel ) private BoltConnection newBoltConnection( BoltChannel channel )
Expand Down
Expand Up @@ -23,11 +23,8 @@
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;


import org.neo4j.bolt.runtime.Neo4jError; import org.neo4j.bolt.messaging.BoltRequestMessageReader;
import org.neo4j.bolt.v1.messaging.BoltIOException; import org.neo4j.bolt.messaging.Neo4jPack;
import org.neo4j.bolt.v1.messaging.BoltRequestMessageHandler;
import org.neo4j.bolt.v1.messaging.BoltRequestMessageReader;
import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.bolt.v1.packstream.ByteBufInput; import org.neo4j.bolt.v1.packstream.ByteBufInput;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
Expand All @@ -37,15 +34,15 @@
public class MessageDecoder extends SimpleChannelInboundHandler<ByteBuf> public class MessageDecoder extends SimpleChannelInboundHandler<ByteBuf>
{ {
private final ByteBufInput input; private final ByteBufInput input;
private final Neo4jPack.Unpacker unpacker;
private final BoltRequestMessageReader reader; private final BoltRequestMessageReader reader;
private final BoltRequestMessageHandler messageHandler;
private final Log log; private final Log log;


public MessageDecoder( Neo4jPack pack, BoltRequestMessageHandler messageHandler, LogService logService ) public MessageDecoder( Neo4jPack pack, BoltRequestMessageReader reader, LogService logService )
{ {
this.input = new ByteBufInput(); this.input = new ByteBufInput();
this.reader = new BoltRequestMessageReader( pack.newUnpacker( input ) ); this.unpacker = pack.newUnpacker( input );
this.messageHandler = messageHandler; this.reader = reader;
this.log = logService.getInternalLog( getClass() ); this.log = logService.getInternalLog( getClass() );
} }


Expand All @@ -56,19 +53,7 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext, ByteBu
byteBuf.markReaderIndex(); byteBuf.markReaderIndex();
try try
{ {
reader.read( messageHandler ); reader.read( unpacker );
}
catch ( BoltIOException ex )
{
if ( ex.causesFailureMessage() )
{
messageHandler.onExternalError( Neo4jError.from( ex ) );
}
else
{
logMessageOnError( byteBuf );
throw ex;
}
} }
catch ( Throwable error ) catch ( Throwable error )
{ {
Expand Down

0 comments on commit e67c4d6

Please sign in to comment.