Skip to content

Commit

Permalink
Versioned bolt response message writer
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhen Li authored and lutovich committed Jul 5, 2018
1 parent b689ef4 commit 20657cc
Show file tree
Hide file tree
Showing 91 changed files with 1,355 additions and 1,148 deletions.
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.messaging;

import java.io.IOException;

/**
* Interface defining simple encoders for each defined
* Bolt response message.
*/
public interface BoltResponseMessageWriter
{
void write( ResponseMessage message ) throws IOException;
}
Expand Up @@ -17,13 +17,9 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.messaging.message;

import java.io.IOException;

import org.neo4j.bolt.v1.messaging.BoltResponseMessageHandler;
package org.neo4j.bolt.messaging;

public interface ResponseMessage
{
void dispatch( BoltResponseMessageHandler consumer ) throws IOException;
byte signature();
}
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.messaging;

import java.io.IOException;

public interface ResponseMessageEncoder<T extends ResponseMessage>
{
void encode( Neo4jPack.Packer packer, T message ) throws IOException;
}
Expand Up @@ -40,5 +40,4 @@ public interface BoltResponseHandler

/** Called when the operation is completed. */
void onFinish();

}
Expand Up @@ -33,7 +33,7 @@
import org.neo4j.bolt.transport.pipeline.MessageAccumulator;
import org.neo4j.bolt.transport.pipeline.MessageDecoder;
import org.neo4j.bolt.v1.messaging.BoltRequestMessageReaderV1;
import org.neo4j.bolt.v1.messaging.BoltResponseMessageWriter;
import org.neo4j.bolt.v1.messaging.BoltResponseMessageWriterV1;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.runtime.BoltStateMachineFactory;
import org.neo4j.kernel.impl.logging.LogService;
Expand Down Expand Up @@ -91,7 +91,7 @@ public long version()

public static BoltRequestMessageReader createBoltMessageReaderV1( BoltChannel channel, Neo4jPack neo4jPack, BoltConnection connection, LogService logging )
{
BoltResponseMessageWriter responseWriter = new BoltResponseMessageWriter( neo4jPack, connection.output(), logging, channel.log() );
BoltResponseMessageWriterV1 responseWriter = new BoltResponseMessageWriterV1( neo4jPack, connection.output(), logging, channel.log() );
return new BoltRequestMessageReaderV1( connection, responseWriter, channel.log(), logging );
}
}
Expand Up @@ -27,48 +27,49 @@
import org.neo4j.bolt.messaging.RequestMessageDecoder;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.BoltResponseHandler;
import org.neo4j.bolt.v1.messaging.decoder.AckFailureDecoder;
import org.neo4j.bolt.v1.messaging.decoder.DiscardAllDecoder;
import org.neo4j.bolt.v1.messaging.decoder.InitDecoder;
import org.neo4j.bolt.v1.messaging.decoder.PullAllDecoder;
import org.neo4j.bolt.v1.messaging.decoder.ResetDecoder;
import org.neo4j.bolt.v1.messaging.decoder.RunDecoder;
import org.neo4j.bolt.v1.messaging.decoder.AckFailureMessageDecoder;
import org.neo4j.bolt.messaging.BoltResponseMessageWriter;
import org.neo4j.bolt.v1.messaging.decoder.DiscardAllMessageDecoder;
import org.neo4j.bolt.v1.messaging.decoder.InitMessageDecoder;
import org.neo4j.bolt.v1.messaging.decoder.PullAllMessageDecoder;
import org.neo4j.bolt.v1.messaging.decoder.ResetMessageDecoder;
import org.neo4j.bolt.v1.messaging.decoder.RunMessageDecoder;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;

public class BoltRequestMessageReaderV1 extends BoltRequestMessageReader
{
public BoltRequestMessageReaderV1( BoltConnection connection, BoltResponseMessageHandler responseMessageHandler,
public BoltRequestMessageReaderV1( BoltConnection connection, BoltResponseMessageWriter responseMessageWriter,
BoltMessageLogger messageLogger, LogService logService )
{
super( connection,
newSimpleResponseHandler( connection, responseMessageHandler, logService ),
buildDecoders( connection, responseMessageHandler, messageLogger, logService ),
newSimpleResponseHandler( connection, responseMessageWriter, logService ),
buildDecoders( connection, responseMessageWriter, messageLogger, logService ),
messageLogger );
}

private static List<RequestMessageDecoder> buildDecoders( BoltConnection connection, BoltResponseMessageHandler responseMessageHandler,
private static List<RequestMessageDecoder> buildDecoders( BoltConnection connection, BoltResponseMessageWriter responseMessageWriter,
BoltMessageLogger messageLogger, LogService logService )
{
BoltResponseHandler initHandler = newSimpleResponseHandler( connection, responseMessageHandler, logService );
BoltResponseHandler runHandler = newSimpleResponseHandler( connection, responseMessageHandler, logService );
BoltResponseHandler resultHandler = new ResultHandler( responseMessageHandler, connection, internalLog( logService ) );
BoltResponseHandler defaultHandler = newSimpleResponseHandler( connection, responseMessageHandler, logService );
BoltResponseHandler initHandler = newSimpleResponseHandler( connection, responseMessageWriter, logService );
BoltResponseHandler runHandler = newSimpleResponseHandler( connection, responseMessageWriter, logService );
BoltResponseHandler resultHandler = new ResultHandler( responseMessageWriter, connection, internalLog( logService ) );
BoltResponseHandler defaultHandler = newSimpleResponseHandler( connection, responseMessageWriter, logService );

return Arrays.asList(
new InitDecoder( initHandler, messageLogger ),
new AckFailureDecoder( defaultHandler, messageLogger ),
new ResetDecoder( connection, defaultHandler, messageLogger ),
new RunDecoder( runHandler, messageLogger ),
new DiscardAllDecoder( resultHandler, messageLogger ),
new PullAllDecoder( resultHandler, messageLogger )
new InitMessageDecoder( initHandler, messageLogger ),
new AckFailureMessageDecoder( defaultHandler, messageLogger ),
new ResetMessageDecoder( connection, defaultHandler, messageLogger ),
new RunMessageDecoder( runHandler, messageLogger ),
new DiscardAllMessageDecoder( resultHandler, messageLogger ),
new PullAllMessageDecoder( resultHandler, messageLogger )
);
}

private static BoltResponseHandler newSimpleResponseHandler( BoltConnection connection,
BoltResponseMessageHandler responseMessageHandler, LogService logService )
BoltResponseMessageWriter responseMessageWriter, LogService logService )
{
return new MessageProcessingHandler( responseMessageHandler, connection, internalLog( logService ) );
return new MessageProcessingHandler( responseMessageWriter, connection, internalLog( logService ) );
}

private static Log internalLog( LogService logService )
Expand Down

This file was deleted.

0 comments on commit 20657cc

Please sign in to comment.