Skip to content

Commit

Permalink
Rename result_available_after and result_comsumed_after in v3 to …
Browse files Browse the repository at this point in the history
…`t_first` and `t_last`
  • Loading branch information
Zhen Li authored and lutovich committed Jul 26, 2018
1 parent 1b1090d commit cde84a8
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 110 deletions.
Expand Up @@ -27,9 +27,11 @@
import org.neo4j.bolt.v1.BoltProtocolV1;
import org.neo4j.bolt.v1.runtime.BoltStateMachineV1;
import org.neo4j.bolt.v1.runtime.BoltStateMachineV1SPI;
import org.neo4j.bolt.v1.runtime.TransactionStateMachineV1SPI;
import org.neo4j.bolt.v2.BoltProtocolV2;
import org.neo4j.bolt.v3.BoltProtocolV3;
import org.neo4j.bolt.v3.BoltStateMachineV3;
import org.neo4j.bolt.v3.runtime.TransactionStateMachineV3SPI;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.configuration.Config;
Expand Down Expand Up @@ -78,22 +80,21 @@ else if ( protocolVersion == BoltProtocolV3.VERSION )

private BoltStateMachine newStateMachineV1( BoltChannel boltChannel )
{
TransactionStateMachineSPI transactionSPI = createTxSpi( clock );
BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, transactionSPI );
long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout ).toMillis();
Duration txAwaitDuration = Duration.ofMillis( bookmarkReadyTimeout );
TransactionStateMachineSPI transactionSPI = new TransactionStateMachineV1SPI( db, availabilityGuard, txAwaitDuration, clock );

BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, connectionTracker, transactionSPI );
return new BoltStateMachineV1( boltSPI, boltChannel, clock );
}

private BoltStateMachine newStateMachineV3( BoltChannel boltChannel )
{
TransactionStateMachineSPI transactionSPI = createTxSpi( clock );
BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, transactionSPI );
return new BoltStateMachineV3( boltSPI, boltChannel, clock );
}

private TransactionStateMachineSPI createTxSpi( Clock clock )
{
long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout ).toMillis();
Duration txAwaitDuration = Duration.ofMillis( bookmarkReadyTimeout );
return new org.neo4j.bolt.v1.runtime.TransactionStateMachineSPI( db, availabilityGuard, txAwaitDuration, clock );
TransactionStateMachineSPI transactionSPI = new TransactionStateMachineV3SPI( db, availabilityGuard, txAwaitDuration, clock );

BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, connectionTracker, transactionSPI );
return new BoltStateMachineV3( boltSPI, boltChannel, clock );
}
}
Expand Up @@ -41,13 +41,13 @@
import static org.neo4j.values.storable.Values.longValue;
import static org.neo4j.values.storable.Values.stringValue;

class CypherAdapterStream implements BoltResult
public class CypherAdapterStream implements BoltResult
{
private final QueryResult delegate;
private final String[] fieldNames;
private final Clock clock;

CypherAdapterStream( QueryResult delegate, Clock clock )
public CypherAdapterStream( QueryResult delegate, Clock clock )
{
this.delegate = delegate;
this.fieldNames = delegate.fieldNames();
Expand Down Expand Up @@ -75,7 +75,7 @@ public void accept( final Visitor visitor ) throws Exception
visitor.visit( row );
return true;
} );
visitor.addMetadata( "result_consumed_after", longValue( clock.millis() - start ) );
addRecordStreamingTime( visitor, clock.millis() - start );
QueryExecutionType qt = delegate.executionType();
visitor.addMetadata( "type", Values.stringValue( queryTypeCode( qt.queryType() ) ) );

Expand All @@ -98,6 +98,11 @@ public void accept( final Visitor visitor ) throws Exception
}
}

protected void addRecordStreamingTime( Visitor visitor, long time )
{
visitor.addMetadata( "result_consumed_after", longValue( time ) );
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.bolt.runtime.BoltQuerySource;
import org.neo4j.bolt.runtime.BoltResult;
import org.neo4j.bolt.runtime.BoltResultHandle;
import org.neo4j.bolt.runtime.TransactionStateMachineSPI;
import org.neo4j.cypher.internal.javacompat.QueryResultProvider;
import org.neo4j.graphdb.Result;
import org.neo4j.internal.kernel.api.exceptions.KernelException;
Expand Down Expand Up @@ -55,7 +56,7 @@
import static org.neo4j.internal.kernel.api.Transaction.Type.explicit;
import static org.neo4j.internal.kernel.api.Transaction.Type.implicit;

public class TransactionStateMachineSPI implements org.neo4j.bolt.runtime.TransactionStateMachineSPI
public class TransactionStateMachineV1SPI implements TransactionStateMachineSPI
{
private static final PropertyContainerLocker locker = new PropertyContainerLocker();

Expand All @@ -67,7 +68,7 @@ public class TransactionStateMachineSPI implements org.neo4j.bolt.runtime.Transa
private final Duration txAwaitDuration;
private final Clock clock;

public TransactionStateMachineSPI( GraphDatabaseAPI db, AvailabilityGuard availabilityGuard, Duration txAwaitDuration, Clock clock )
public TransactionStateMachineV1SPI( GraphDatabaseAPI db, AvailabilityGuard availabilityGuard, Duration txAwaitDuration, Clock clock )
{
this.db = db;
this.txBridge = db.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class );
Expand Down Expand Up @@ -127,50 +128,12 @@ public BoltResultHandle executeQuery( BoltQuerySource querySource, LoginContext
TransactionalContext transactionalContext =
contextFactory.newContext( sourceDetails, internalTransaction, statement, params );

return new BoltResultHandle()
{

@Override
public BoltResult start() throws KernelException
{
try
{
Result result = queryExecutionEngine.executeQuery( statement, params, transactionalContext );
if ( result instanceof QueryResultProvider )
{
return new CypherAdapterStream( ((QueryResultProvider) result).queryResult(), clock );
}
else
{
throw new IllegalStateException( format( "Unexpected query execution result. Expected to get instance of %s but was %s.",
QueryResultProvider.class.getName(), result.getClass().getName() ) );
}
}
catch ( KernelException e )
{
close( false );
throw new QueryExecutionKernelException( e );
}
catch ( Throwable e )
{
close( false );
throw e;
}
}

@Override
public void close( boolean success )
{
transactionalContext.close( success );
}

@Override
public void terminate()
{
transactionalContext.terminate();
}
return newBoltResultHandle( statement, params, transactionalContext );
}

};
protected BoltResultHandle newBoltResultHandle( String statement, MapValue params, TransactionalContext transactionalContext )
{
return new BoltResultHandleV1( statement, params, transactionalContext );
}

private InternalTransaction beginTransaction( KernelTransaction.Type type, LoginContext loginContext, Duration txTimeout, Map<String, Object> txMetadata )
Expand Down Expand Up @@ -203,4 +166,63 @@ private static TransactionalContextFactory newTransactionalContextFactory( Graph
GraphDatabaseQueryService queryService = db.getDependencyResolver().resolveDependency( GraphDatabaseQueryService.class );
return Neo4jTransactionalContextFactory.create( queryService, locker );
}

public class BoltResultHandleV1 implements BoltResultHandle
{
private final String statement;
private final MapValue params;
private final TransactionalContext transactionalContext;

public BoltResultHandleV1( String statement, MapValue params, TransactionalContext transactionalContext )
{
this.statement = statement;
this.params = params;
this.transactionalContext = transactionalContext;
}

@Override
public BoltResult start() throws KernelException
{
try
{
Result result = queryExecutionEngine.executeQuery( statement, params, transactionalContext );
if ( result instanceof QueryResultProvider )
{
return newBoltResult( (QueryResultProvider) result, clock );
}
else
{
throw new IllegalStateException( format( "Unexpected query execution result. Expected to get instance of %s but was %s.",
QueryResultProvider.class.getName(), result.getClass().getName() ) );
}
}
catch ( KernelException e )
{
close( false );
throw new QueryExecutionKernelException( e );
}
catch ( Throwable e )
{
close( false );
throw e;
}
}

protected BoltResult newBoltResult( QueryResultProvider result, Clock clock )
{
return new CypherAdapterStream( result.queryResult(), clock );
}

@Override
public void close( boolean success )
{
transactionalContext.close( success );
}

@Override
public void terminate()
{
transactionalContext.terminate();
}
}
}
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v3.runtime;

import java.time.Clock;

import org.neo4j.bolt.v1.runtime.CypherAdapterStream;
import org.neo4j.cypher.result.QueryResult;

import static org.neo4j.values.storable.Values.longValue;

class CypherAdapterStreamV3 extends CypherAdapterStream
{
private static final String LAST_RESULT_CONSUMED_KEY = "t_last";

CypherAdapterStreamV3( QueryResult delegate, Clock clock )
{
super( delegate, clock );
}

@Override
protected void addRecordStreamingTime( Visitor visitor, long time )
{
visitor.addMetadata( LAST_RESULT_CONSUMED_KEY, longValue( time ) );
}
}
Expand Up @@ -50,8 +50,7 @@ public class ReadyState extends FailSafeBoltStateMachineState
private BoltStateMachineState txReadyState;

static final String FIELDS_KEY = "fields";
static final String FIRST_RECORD_AVAILABLE_KEY = "result_available_after";
private static final String TX_ID_KEY = "tx_id";
static final String FIRST_RECORD_AVAILABLE_KEY = "t_first";

@Override
public BoltStateMachineState process( RequestMessage message, StateMachineContext context ) throws BoltConnectionFatality
Expand Down Expand Up @@ -103,7 +102,6 @@ private BoltStateMachineState processRunMessage( RunMessage message, StateMachin

context.connectionState().onMetadata( FIELDS_KEY, stringArray( statementMetadata.fieldNames() ) );
context.connectionState().onMetadata( FIRST_RECORD_AVAILABLE_KEY, Values.longValue( end - start ) );
context.connectionState().onMetadata( TX_ID_KEY, Values.NO_VALUE ); //TODO return tx_id

return streamingState;
}
Expand All @@ -112,7 +110,6 @@ private BoltStateMachineState processBeginMessage( BeginMessage message, StateMa
{
StatementProcessor statementProcessor = context.connectionState().getStatementProcessor();
statementProcessor.beginTransaction( message.bookmark(), message.transactionTimeout(), message.transactionMetadata() );
context.connectionState().onMetadata( TX_ID_KEY, Values.NO_VALUE ); // TODO
return txReadyState;
}

Expand Down
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v3.runtime;

import java.time.Clock;
import java.time.Duration;

import org.neo4j.bolt.runtime.BoltResult;
import org.neo4j.bolt.runtime.BoltResultHandle;
import org.neo4j.bolt.v1.runtime.TransactionStateMachineV1SPI;
import org.neo4j.cypher.internal.javacompat.QueryResultProvider;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.impl.query.TransactionalContext;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.values.virtual.MapValue;

public class TransactionStateMachineV3SPI extends TransactionStateMachineV1SPI
{

public TransactionStateMachineV3SPI( GraphDatabaseAPI db, AvailabilityGuard availabilityGuard, Duration txAwaitDuration, Clock clock )
{
super( db, availabilityGuard, txAwaitDuration, clock );
}

@Override
protected BoltResultHandle newBoltResultHandle( String statement, MapValue params, TransactionalContext transactionalContext )
{
return new BoltResultHandleV3( statement, params, transactionalContext );
}

private class BoltResultHandleV3 extends BoltResultHandleV1
{

BoltResultHandleV3( String statement, MapValue params, TransactionalContext transactionalContext )
{
super( statement, params, transactionalContext );
}

@Override
protected BoltResult newBoltResult( QueryResultProvider result, Clock clock )
{
return new CypherAdapterStreamV3( result.queryResult(), clock );
}
}
}

0 comments on commit cde84a8

Please sign in to comment.