From cde84a89e4cd63b012087a8882b57fa4f27b99d2 Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Mon, 23 Jul 2018 12:46:06 +0200 Subject: [PATCH] Rename `result_available_after` and `result_comsumed_after` in v3 to `t_first` and `t_last` --- .../runtime/BoltStateMachineFactoryImpl.java | 21 ++-- .../bolt/v1/runtime/CypherAdapterStream.java | 11 +- ...java => TransactionStateMachineV1SPI.java} | 112 +++++++++++------- .../v3/runtime/CypherAdapterStreamV3.java | 43 +++++++ .../org/neo4j/bolt/v3/runtime/ReadyState.java | 5 +- .../runtime/TransactionStateMachineV3SPI.java | 62 ++++++++++ .../runtime/TransactionStateMachineTest.java | 40 +++---- ... => TransactionStateMachineV1SPITest.java} | 12 +- .../integration/BoltV3TransportIT.java | 30 ++--- .../v3/runtime/integration/ReadyStateIT.java | 4 +- .../runtime/integration/StreamingStateIT.java | 2 +- .../integration/TransactionReadyStateIT.java | 3 +- .../TransactionStreamingStateIT.java | 2 +- 13 files changed, 237 insertions(+), 110 deletions(-) rename community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/{TransactionStateMachineSPI.java => TransactionStateMachineV1SPI.java} (73%) create mode 100644 community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/CypherAdapterStreamV3.java create mode 100644 community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/TransactionStateMachineV3SPI.java rename community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/{TransactionStateMachineSPITest.java => TransactionStateMachineV1SPITest.java} (89%) diff --git a/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltStateMachineFactoryImpl.java b/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltStateMachineFactoryImpl.java index fb55d17aa43f1..55c49bed1bceb 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltStateMachineFactoryImpl.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltStateMachineFactoryImpl.java @@ -27,9 +27,11 @@ import org.neo4j.bolt.v1.BoltProtocolV1; import org.neo4j.bolt.v1.runtime.BoltStateMachineV1; import org.neo4j.bolt.v1.runtime.BoltStateMachineV1SPI; +import org.neo4j.bolt.v1.runtime.TransactionStateMachineV1SPI; import org.neo4j.bolt.v2.BoltProtocolV2; import org.neo4j.bolt.v3.BoltProtocolV3; import org.neo4j.bolt.v3.BoltStateMachineV3; +import org.neo4j.bolt.v3.runtime.TransactionStateMachineV3SPI; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.configuration.Config; @@ -78,22 +80,21 @@ else if ( protocolVersion == BoltProtocolV3.VERSION ) private BoltStateMachine newStateMachineV1( BoltChannel boltChannel ) { - TransactionStateMachineSPI transactionSPI = createTxSpi( clock ); - BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, transactionSPI ); + long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout ).toMillis(); + Duration txAwaitDuration = Duration.ofMillis( bookmarkReadyTimeout ); + TransactionStateMachineSPI transactionSPI = new TransactionStateMachineV1SPI( db, availabilityGuard, txAwaitDuration, clock ); + + BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, connectionTracker, transactionSPI ); return new BoltStateMachineV1( boltSPI, boltChannel, clock ); } private BoltStateMachine newStateMachineV3( BoltChannel boltChannel ) - { - TransactionStateMachineSPI transactionSPI = createTxSpi( clock ); - BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, transactionSPI ); - return new BoltStateMachineV3( boltSPI, boltChannel, clock ); - } - - private TransactionStateMachineSPI createTxSpi( Clock clock ) { long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout ).toMillis(); Duration txAwaitDuration = Duration.ofMillis( bookmarkReadyTimeout ); - return new org.neo4j.bolt.v1.runtime.TransactionStateMachineSPI( db, availabilityGuard, txAwaitDuration, clock ); + TransactionStateMachineSPI transactionSPI = new TransactionStateMachineV3SPI( db, availabilityGuard, txAwaitDuration, clock ); + + BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, connectionTracker, transactionSPI ); + return new BoltStateMachineV3( boltSPI, boltChannel, clock ); } } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/CypherAdapterStream.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/CypherAdapterStream.java index 029e1aefe584f..ad7aa51b21811 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/CypherAdapterStream.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/CypherAdapterStream.java @@ -41,13 +41,13 @@ import static org.neo4j.values.storable.Values.longValue; import static org.neo4j.values.storable.Values.stringValue; -class CypherAdapterStream implements BoltResult +public class CypherAdapterStream implements BoltResult { private final QueryResult delegate; private final String[] fieldNames; private final Clock clock; - CypherAdapterStream( QueryResult delegate, Clock clock ) + public CypherAdapterStream( QueryResult delegate, Clock clock ) { this.delegate = delegate; this.fieldNames = delegate.fieldNames(); @@ -75,7 +75,7 @@ public void accept( final Visitor visitor ) throws Exception visitor.visit( row ); return true; } ); - visitor.addMetadata( "result_consumed_after", longValue( clock.millis() - start ) ); + addRecordStreamingTime( visitor, clock.millis() - start ); QueryExecutionType qt = delegate.executionType(); visitor.addMetadata( "type", Values.stringValue( queryTypeCode( qt.queryType() ) ) ); @@ -98,6 +98,11 @@ public void accept( final Visitor visitor ) throws Exception } } + protected void addRecordStreamingTime( Visitor visitor, long time ) + { + visitor.addMetadata( "result_consumed_after", longValue( time ) ); + } + @Override public String toString() { diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineV1SPI.java similarity index 73% rename from community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java rename to community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineV1SPI.java index ab721df6acada..8a239fa6f3e0e 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineV1SPI.java @@ -28,6 +28,7 @@ import org.neo4j.bolt.runtime.BoltQuerySource; import org.neo4j.bolt.runtime.BoltResult; import org.neo4j.bolt.runtime.BoltResultHandle; +import org.neo4j.bolt.runtime.TransactionStateMachineSPI; import org.neo4j.cypher.internal.javacompat.QueryResultProvider; import org.neo4j.graphdb.Result; import org.neo4j.internal.kernel.api.exceptions.KernelException; @@ -55,7 +56,7 @@ import static org.neo4j.internal.kernel.api.Transaction.Type.explicit; import static org.neo4j.internal.kernel.api.Transaction.Type.implicit; -public class TransactionStateMachineSPI implements org.neo4j.bolt.runtime.TransactionStateMachineSPI +public class TransactionStateMachineV1SPI implements TransactionStateMachineSPI { private static final PropertyContainerLocker locker = new PropertyContainerLocker(); @@ -67,7 +68,7 @@ public class TransactionStateMachineSPI implements org.neo4j.bolt.runtime.Transa private final Duration txAwaitDuration; private final Clock clock; - public TransactionStateMachineSPI( GraphDatabaseAPI db, AvailabilityGuard availabilityGuard, Duration txAwaitDuration, Clock clock ) + public TransactionStateMachineV1SPI( GraphDatabaseAPI db, AvailabilityGuard availabilityGuard, Duration txAwaitDuration, Clock clock ) { this.db = db; this.txBridge = db.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class ); @@ -127,50 +128,12 @@ public BoltResultHandle executeQuery( BoltQuerySource querySource, LoginContext TransactionalContext transactionalContext = contextFactory.newContext( sourceDetails, internalTransaction, statement, params ); - return new BoltResultHandle() - { - - @Override - public BoltResult start() throws KernelException - { - try - { - Result result = queryExecutionEngine.executeQuery( statement, params, transactionalContext ); - if ( result instanceof QueryResultProvider ) - { - return new CypherAdapterStream( ((QueryResultProvider) result).queryResult(), clock ); - } - else - { - throw new IllegalStateException( format( "Unexpected query execution result. Expected to get instance of %s but was %s.", - QueryResultProvider.class.getName(), result.getClass().getName() ) ); - } - } - catch ( KernelException e ) - { - close( false ); - throw new QueryExecutionKernelException( e ); - } - catch ( Throwable e ) - { - close( false ); - throw e; - } - } - - @Override - public void close( boolean success ) - { - transactionalContext.close( success ); - } - - @Override - public void terminate() - { - transactionalContext.terminate(); - } + return newBoltResultHandle( statement, params, transactionalContext ); + } - }; + protected BoltResultHandle newBoltResultHandle( String statement, MapValue params, TransactionalContext transactionalContext ) + { + return new BoltResultHandleV1( statement, params, transactionalContext ); } private InternalTransaction beginTransaction( KernelTransaction.Type type, LoginContext loginContext, Duration txTimeout, Map txMetadata ) @@ -203,4 +166,63 @@ private static TransactionalContextFactory newTransactionalContextFactory( Graph GraphDatabaseQueryService queryService = db.getDependencyResolver().resolveDependency( GraphDatabaseQueryService.class ); return Neo4jTransactionalContextFactory.create( queryService, locker ); } + + public class BoltResultHandleV1 implements BoltResultHandle + { + private final String statement; + private final MapValue params; + private final TransactionalContext transactionalContext; + + public BoltResultHandleV1( String statement, MapValue params, TransactionalContext transactionalContext ) + { + this.statement = statement; + this.params = params; + this.transactionalContext = transactionalContext; + } + + @Override + public BoltResult start() throws KernelException + { + try + { + Result result = queryExecutionEngine.executeQuery( statement, params, transactionalContext ); + if ( result instanceof QueryResultProvider ) + { + return newBoltResult( (QueryResultProvider) result, clock ); + } + else + { + throw new IllegalStateException( format( "Unexpected query execution result. Expected to get instance of %s but was %s.", + QueryResultProvider.class.getName(), result.getClass().getName() ) ); + } + } + catch ( KernelException e ) + { + close( false ); + throw new QueryExecutionKernelException( e ); + } + catch ( Throwable e ) + { + close( false ); + throw e; + } + } + + protected BoltResult newBoltResult( QueryResultProvider result, Clock clock ) + { + return new CypherAdapterStream( result.queryResult(), clock ); + } + + @Override + public void close( boolean success ) + { + transactionalContext.close( success ); + } + + @Override + public void terminate() + { + transactionalContext.terminate(); + } + } } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/CypherAdapterStreamV3.java b/community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/CypherAdapterStreamV3.java new file mode 100644 index 0000000000000..1fbf5c02449aa --- /dev/null +++ b/community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/CypherAdapterStreamV3.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.bolt.v3.runtime; + +import java.time.Clock; + +import org.neo4j.bolt.v1.runtime.CypherAdapterStream; +import org.neo4j.cypher.result.QueryResult; + +import static org.neo4j.values.storable.Values.longValue; + +class CypherAdapterStreamV3 extends CypherAdapterStream +{ + private static final String LAST_RESULT_CONSUMED_KEY = "t_last"; + + CypherAdapterStreamV3( QueryResult delegate, Clock clock ) + { + super( delegate, clock ); + } + + @Override + protected void addRecordStreamingTime( Visitor visitor, long time ) + { + visitor.addMetadata( LAST_RESULT_CONSUMED_KEY, longValue( time ) ); + } +} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/ReadyState.java b/community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/ReadyState.java index 9a960305c85ef..2157f8041755e 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/ReadyState.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/ReadyState.java @@ -50,8 +50,7 @@ public class ReadyState extends FailSafeBoltStateMachineState private BoltStateMachineState txReadyState; static final String FIELDS_KEY = "fields"; - static final String FIRST_RECORD_AVAILABLE_KEY = "result_available_after"; - private static final String TX_ID_KEY = "tx_id"; + static final String FIRST_RECORD_AVAILABLE_KEY = "t_first"; @Override public BoltStateMachineState process( RequestMessage message, StateMachineContext context ) throws BoltConnectionFatality @@ -103,7 +102,6 @@ private BoltStateMachineState processRunMessage( RunMessage message, StateMachin context.connectionState().onMetadata( FIELDS_KEY, stringArray( statementMetadata.fieldNames() ) ); context.connectionState().onMetadata( FIRST_RECORD_AVAILABLE_KEY, Values.longValue( end - start ) ); - context.connectionState().onMetadata( TX_ID_KEY, Values.NO_VALUE ); //TODO return tx_id return streamingState; } @@ -112,7 +110,6 @@ private BoltStateMachineState processBeginMessage( BeginMessage message, StateMa { StatementProcessor statementProcessor = context.connectionState().getStatementProcessor(); statementProcessor.beginTransaction( message.bookmark(), message.transactionTimeout(), message.transactionMetadata() ); - context.connectionState().onMetadata( TX_ID_KEY, Values.NO_VALUE ); // TODO return txReadyState; } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/TransactionStateMachineV3SPI.java b/community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/TransactionStateMachineV3SPI.java new file mode 100644 index 0000000000000..e3e4134bd849a --- /dev/null +++ b/community/bolt/src/main/java/org/neo4j/bolt/v3/runtime/TransactionStateMachineV3SPI.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.bolt.v3.runtime; + +import java.time.Clock; +import java.time.Duration; + +import org.neo4j.bolt.runtime.BoltResult; +import org.neo4j.bolt.runtime.BoltResultHandle; +import org.neo4j.bolt.v1.runtime.TransactionStateMachineV1SPI; +import org.neo4j.cypher.internal.javacompat.QueryResultProvider; +import org.neo4j.kernel.AvailabilityGuard; +import org.neo4j.kernel.impl.query.TransactionalContext; +import org.neo4j.kernel.internal.GraphDatabaseAPI; +import org.neo4j.values.virtual.MapValue; + +public class TransactionStateMachineV3SPI extends TransactionStateMachineV1SPI +{ + + public TransactionStateMachineV3SPI( GraphDatabaseAPI db, AvailabilityGuard availabilityGuard, Duration txAwaitDuration, Clock clock ) + { + super( db, availabilityGuard, txAwaitDuration, clock ); + } + + @Override + protected BoltResultHandle newBoltResultHandle( String statement, MapValue params, TransactionalContext transactionalContext ) + { + return new BoltResultHandleV3( statement, params, transactionalContext ); + } + + private class BoltResultHandleV3 extends BoltResultHandleV1 + { + + BoltResultHandleV3( String statement, MapValue params, TransactionalContext transactionalContext ) + { + super( statement, params, transactionalContext ); + } + + @Override + protected BoltResult newBoltResult( QueryResultProvider result, Clock clock ) + { + return new CypherAdapterStreamV3( result.queryResult(), clock ); + } + } +} diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineTest.java index eda44fff6ef7f..0146f6f877c93 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineTest.java @@ -56,14 +56,14 @@ class TransactionStateMachineTest { - private TransactionStateMachineSPI stateMachineSPI; + private TransactionStateMachineV1SPI stateMachineSPI; private TransactionStateMachine.MutableTransactionState mutableState; private TransactionStateMachine stateMachine; @BeforeEach void createMocks() { - stateMachineSPI = mock( TransactionStateMachineSPI.class ); + stateMachineSPI = mock( TransactionStateMachineV1SPI.class ); mutableState = mock(TransactionStateMachine.MutableTransactionState.class); stateMachine = new TransactionStateMachine( stateMachineSPI, AUTH_DISABLED, new FakeClock() ); } @@ -153,7 +153,7 @@ void shouldAwaitMultipleBookmarksWhenBothSingleAndMultipleSupplied() throws Exce @Test void shouldStartWithAutoCommitState() { - TransactionStateMachineSPI stateMachineSPI = mock( TransactionStateMachineSPI.class ); + TransactionStateMachineV1SPI stateMachineSPI = mock( TransactionStateMachineV1SPI.class ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); assertThat( stateMachine.state, is( TransactionStateMachine.State.AUTO_COMMIT ) ); @@ -166,7 +166,7 @@ void shouldStartWithAutoCommitState() void shouldDoNothingInAutoCommitTransactionUponInitialisationWhenValidated() throws Exception { KernelTransaction transaction = newTimedOutTransaction(); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); // We're in auto-commit state @@ -188,7 +188,7 @@ void shouldDoNothingInAutoCommitTransactionUponInitialisationWhenValidated() thr void shouldResetInAutoCommitTransactionWhileStatementIsRunningWhenValidated() throws Exception { KernelTransaction transaction = newTimedOutTransaction(); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); // We're in auto-commit state @@ -218,7 +218,7 @@ void shouldResetInAutoCommitTransactionWhileStatementIsRunningWhenValidated() th void shouldResetInExplicitTransactionUponTxBeginWhenValidated() throws Exception { KernelTransaction transaction = newTimedOutTransaction(); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); // start an explicit transaction @@ -243,7 +243,7 @@ void shouldResetInExplicitTransactionUponTxBeginWhenValidated() throws Exception void shouldResetInExplicitTransactionWhileStatementIsRunningWhenValidated() throws Exception { KernelTransaction transaction = newTimedOutTransaction(); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); // start an explicit transaction @@ -270,7 +270,7 @@ void shouldResetInExplicitTransactionWhileStatementIsRunningWhenValidated() thro void shouldUnbindTxAfterRun() throws Exception { KernelTransaction transaction = newTimedOutTransaction(); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); stateMachine.run( "SOME STATEMENT", null ); @@ -282,7 +282,7 @@ void shouldUnbindTxAfterRun() throws Exception void shouldUnbindTxAfterStreamResult() throws Exception { KernelTransaction transaction = newTimedOutTransaction(); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); stateMachine.run( "SOME STATEMENT", null ); @@ -298,7 +298,7 @@ void shouldUnbindTxAfterStreamResult() throws Exception void shouldThrowDuringRunIfPendingTerminationNoticeExists() throws Exception { KernelTransaction transaction = newTimedOutTransaction(); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); stateMachine.ctx.pendingTerminationNotice = Status.Transaction.TransactionTimedOut; @@ -313,7 +313,7 @@ void shouldThrowDuringRunIfPendingTerminationNoticeExists() throws Exception void shouldThrowDuringStreamResultIfPendingTerminationNoticeExists() throws Exception { KernelTransaction transaction = newTimedOutTransaction(); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); stateMachine.run( "SOME STATEMENT", null ); @@ -333,7 +333,7 @@ void shouldCloseResultAndTransactionHandlesWhenExecutionFails() throws Exception { KernelTransaction transaction = newTransaction(); BoltResultHandle resultHandle = newResultHandle( new RuntimeException( "some error" ) ); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction, resultHandle ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction, resultHandle ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); RuntimeException e = assertThrows( RuntimeException.class, () -> stateMachine.run( "SOME STATEMENT", null ) ); @@ -348,7 +348,7 @@ void shouldCloseResultAndTransactionHandlesWhenExecutionFails() throws Exception void shouldCloseResultAndTransactionHandlesWhenConsumeFails() throws Exception { KernelTransaction transaction = newTransaction(); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); stateMachine.run( "SOME STATEMENT", null ); @@ -375,7 +375,7 @@ void shouldCloseResultHandlesWhenExecutionFailsInExplicitTransaction() throws Ex { KernelTransaction transaction = newTransaction(); BoltResultHandle resultHandle = newResultHandle( new RuntimeException( "some error" ) ); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction, resultHandle ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction, resultHandle ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); RuntimeException e = assertThrows( RuntimeException.class, () -> @@ -398,7 +398,7 @@ void shouldCloseResultHandlesWhenExecutionFailsInExplicitTransaction() throws Ex void shouldCloseResultHandlesWhenConsumeFailsInExplicitTransaction() throws Exception { KernelTransaction transaction = newTransaction(); - TransactionStateMachineSPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); + TransactionStateMachineV1SPI stateMachineSPI = newTransactionStateMachineSPI( transaction ); TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI ); stateMachine.beginTransaction( null ); @@ -443,7 +443,7 @@ private static KernelTransaction newTimedOutTransaction() return transaction; } - private static TransactionStateMachine newTransactionStateMachine( TransactionStateMachineSPI stateMachineSPI ) + private static TransactionStateMachine newTransactionStateMachine( TransactionStateMachineV1SPI stateMachineSPI ) { return new TransactionStateMachine( stateMachineSPI, AUTH_DISABLED, new FakeClock() ); } @@ -453,10 +453,10 @@ private static MapValue map( Object... keyValues ) return ValueUtils.asMapValue( MapUtil.map( keyValues ) ); } - private static TransactionStateMachineSPI newTransactionStateMachineSPI( KernelTransaction transaction ) throws KernelException + private static TransactionStateMachineV1SPI newTransactionStateMachineSPI( KernelTransaction transaction ) throws KernelException { BoltResultHandle resultHandle = newResultHandle(); - TransactionStateMachineSPI stateMachineSPI = mock( TransactionStateMachineSPI.class ); + TransactionStateMachineV1SPI stateMachineSPI = mock( TransactionStateMachineV1SPI.class ); when( stateMachineSPI.beginTransaction( any(), any(), any() ) ).thenReturn( transaction ); when( stateMachineSPI.executeQuery( any(), any(), anyString(), any(), any(), any() ) ).thenReturn( resultHandle ); @@ -464,10 +464,10 @@ private static TransactionStateMachineSPI newTransactionStateMachineSPI( KernelT return stateMachineSPI; } - private static TransactionStateMachineSPI newTransactionStateMachineSPI( KernelTransaction transaction, + private static TransactionStateMachineV1SPI newTransactionStateMachineSPI( KernelTransaction transaction, BoltResultHandle resultHandle ) throws KernelException { - TransactionStateMachineSPI stateMachineSPI = mock( TransactionStateMachineSPI.class ); + TransactionStateMachineV1SPI stateMachineSPI = mock( TransactionStateMachineV1SPI.class ); when( stateMachineSPI.beginTransaction( any(), any(), any() ) ).thenReturn( transaction ); when( stateMachineSPI.executeQuery( any(), any(), anyString(), any(), any(), any() ) ).thenReturn( resultHandle ); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPITest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineV1SPITest.java similarity index 89% rename from community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPITest.java rename to community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineV1SPITest.java index 6a3b29e3ca0cf..0134e888a0840 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPITest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineV1SPITest.java @@ -48,7 +48,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -public class TransactionStateMachineSPITest +public class TransactionStateMachineV1SPITest { @Rule public final OtherThreadRule otherThread = new OtherThreadRule<>(); @@ -71,7 +71,7 @@ public void throwsWhenTxAwaitDurationExpires() return available; } ); - TransactionStateMachineSPI txSpi = createTxSpi( txIdStore, txAwaitDuration, availabilityGuard, clock ); + TransactionStateMachineV1SPI txSpi = createTxSpi( txIdStore, txAwaitDuration, availabilityGuard, clock ); Future result = otherThread.execute( state -> { @@ -96,7 +96,7 @@ public void doesNotWaitWhenTxIdUpToDate() throws Exception long lastClosedTransactionId = 100; Supplier txIdStore = () -> fixedTxIdStore( lastClosedTransactionId ); - TransactionStateMachineSPI txSpi = createTxSpi( txIdStore, Duration.ZERO, Clock.systemUTC() ); + TransactionStateMachineV1SPI txSpi = createTxSpi( txIdStore, Duration.ZERO, Clock.systemUTC() ); Future result = otherThread.execute( state -> { @@ -114,14 +114,14 @@ private static TransactionIdStore fixedTxIdStore( long lastClosedTransactionId ) return txIdStore; } - private static TransactionStateMachineSPI createTxSpi( Supplier txIdStore, Duration txAwaitDuration, + private static TransactionStateMachineV1SPI createTxSpi( Supplier txIdStore, Duration txAwaitDuration, Clock clock ) { AvailabilityGuard availabilityGuard = new AvailabilityGuard( clock, NullLog.getInstance() ); return createTxSpi( txIdStore, txAwaitDuration, availabilityGuard, clock ); } - private static TransactionStateMachineSPI createTxSpi( Supplier txIdStore, Duration txAwaitDuration, + private static TransactionStateMachineV1SPI createTxSpi( Supplier txIdStore, Duration txAwaitDuration, AvailabilityGuard availabilityGuard, Clock clock ) { QueryExecutionEngine queryExecutionEngine = mock( QueryExecutionEngine.class ); @@ -139,6 +139,6 @@ private static TransactionStateMachineSPI createTxSpi( Supplier> entryFieldMatcher = hasEntry( is( "fields" ), equalTo( asList( "a", "a_squared" ) ) ); Matcher> entryTypeMatcher = hasEntry( is( "type" ), equalTo( "r" ) ); assertThat( connection, util.eventuallyReceives( - msgSuccess( allOf( entryFieldMatcher, hasKey( "tx_id" ), hasKey( "result_available_after" ) ) ), + msgSuccess( allOf( entryFieldMatcher, hasKey( "t_first" ) ) ), msgRecord( eqRecord( equalTo( longValue( 1L ) ), equalTo( longValue( 1L ) ) ) ), msgRecord( eqRecord( equalTo( longValue( 2L ) ), equalTo( longValue( 4L ) ) ) ), msgRecord( eqRecord( equalTo( longValue( 3L ) ), equalTo( longValue( 9L ) ) ) ), - msgSuccess( allOf( entryTypeMatcher, hasKey( "result_consumed_after" ), hasKey( "bookmark" ) ) ) ) ); + msgSuccess( allOf( entryTypeMatcher, hasKey( "t_last" ), hasKey( "bookmark" ) ) ) ) ); } @Test @@ -173,8 +173,8 @@ public void shouldRespondWithMetadataToDiscardAll() throws Throwable Matcher> entryTypeMatcher = hasEntry( is( "type" ), equalTo( "r" ) ); Matcher> entryFieldsMatcher = hasEntry( is( "fields" ), equalTo( asList( "a", "a_squared" ) ) ); assertThat( connection, util.eventuallyReceives( - msgSuccess( allOf( entryFieldsMatcher, hasKey( "tx_id" ), hasKey( "result_available_after" ) ) ), - msgSuccess( allOf( entryTypeMatcher, hasKey( "result_consumed_after" ), hasKey( "bookmark" ) ) ) ) ); + msgSuccess( allOf( entryFieldsMatcher, hasKey( "t_first" ) ) ), + msgSuccess( allOf( entryTypeMatcher, hasKey( "t_last" ), hasKey( "bookmark" ) ) ) ) ); } @Test @@ -192,12 +192,12 @@ public void shouldRunSimpleStatementInTx() throws Throwable Matcher> entryFieldMatcher = hasEntry( is( "fields" ), equalTo( asList( "a", "a_squared" ) ) ); Matcher> entryTypeMatcher = hasEntry( is( "type" ), equalTo( "r" ) ); assertThat( connection, util.eventuallyReceives( - msgSuccess( allOf( hasKey( "tx_id" ) ) ), - msgSuccess( allOf( entryFieldMatcher, not( hasKey( "tx_id" ) ), hasKey( "result_available_after" ) ) ), + msgSuccess(), + msgSuccess( allOf( entryFieldMatcher, hasKey( "t_first" ) ) ), msgRecord( eqRecord( equalTo( longValue( 1L ) ), equalTo( longValue( 1L ) ) ) ), msgRecord( eqRecord( equalTo( longValue( 2L ) ), equalTo( longValue( 4L ) ) ) ), msgRecord( eqRecord( equalTo( longValue( 3L ) ), equalTo( longValue( 9L ) ) ) ), - msgSuccess( allOf( entryTypeMatcher, hasKey( "result_consumed_after" ), not( hasKey( "bookmark" ) ) ) ), + msgSuccess( allOf( entryTypeMatcher, hasKey( "t_last" ), not( hasKey( "bookmark" ) ) ) ), msgSuccess( allOf( hasKey( "bookmark" ) ) ) ) ); } @@ -216,12 +216,12 @@ public void shouldAllowRollbackSimpleStatementInTx() throws Throwable Matcher> entryFieldMatcher = hasEntry( is( "fields" ), equalTo( asList( "a", "a_squared" ) ) ); Matcher> entryTypeMatcher = hasEntry( is( "type" ), equalTo( "r" ) ); assertThat( connection, util.eventuallyReceives( - msgSuccess( allOf( hasKey( "tx_id" ) ) ), - msgSuccess( allOf( entryFieldMatcher, not( hasKey( "tx_id" ) ), hasKey( "result_available_after" ) ) ), + msgSuccess(), + msgSuccess( allOf( entryFieldMatcher, hasKey( "t_first" ) ) ), msgRecord( eqRecord( equalTo( longValue( 1L ) ), equalTo( longValue( 1L ) ) ) ), msgRecord( eqRecord( equalTo( longValue( 2L ) ), equalTo( longValue( 4L ) ) ) ), msgRecord( eqRecord( equalTo( longValue( 3L ) ), equalTo( longValue( 9L ) ) ) ), - msgSuccess( allOf( entryTypeMatcher, hasKey( "result_consumed_after" ), not( hasKey( "bookmark" ) ) ) ), + msgSuccess( allOf( entryTypeMatcher, hasKey( "t_last" ), not( hasKey( "bookmark" ) ) ) ), msgSuccess() ) ); } @@ -263,7 +263,7 @@ public void shouldRunProcedure() throws Throwable Matcher> ageMatcher = hasEntry( is( "fields" ), equalTo( singletonList( "age" ) ) ); assertThat( connection, util.eventuallyReceives( - msgSuccess( allOf( ageMatcher, hasKey( "tx_id" ), hasKey( "result_available_after" ) ) ), + msgSuccess( allOf( ageMatcher, hasKey( "t_first" ) ) ), msgRecord( eqRecord( equalTo( longValue( 2L ) ) ) ), msgSuccess() ) ); @@ -275,7 +275,7 @@ public void shouldRunProcedure() throws Throwable // Then Matcher> entryFieldsMatcher = hasEntry( is( "fields" ), equalTo( singletonList( "label" ) ) ); assertThat( connection, util.eventuallyReceives( - msgSuccess( allOf( entryFieldsMatcher, hasKey( "tx_id" ), hasKey( "result_available_after" ) ) ), + msgSuccess( allOf( entryFieldsMatcher, hasKey( "t_first" ) ) ), msgRecord( eqRecord( Matchers.equalTo( stringValue( "Test" ) ) ) ), msgSuccess() ) ); @@ -293,7 +293,7 @@ public void shouldHandleDeletedNodes() throws Throwable // Then Matcher> entryFieldsMatcher = hasEntry( is( "fields" ), equalTo( singletonList( "n" ) ) ); assertThat( connection, util.eventuallyReceives( - msgSuccess( allOf( entryFieldsMatcher, hasKey("tx_id"), hasKey( "result_available_after" ) ) ) ) ); + msgSuccess( allOf( entryFieldsMatcher, hasKey( "t_first" ) ) ) ) ); // //Record(0x71) { @@ -320,7 +320,7 @@ public void shouldHandleDeletedRelationships() throws Throwable // Then Matcher> entryFieldsMatcher = hasEntry( is( "fields" ), equalTo( singletonList( "r" ) ) ); assertThat( connection, util.eventuallyReceives( - msgSuccess( allOf( entryFieldsMatcher, hasKey( "tx_id" ), hasKey( "result_available_after" ) ) ) ) ); + msgSuccess( allOf( entryFieldsMatcher, hasKey( "t_first" ) ) ) ) ); // //Record(0x71) { @@ -360,7 +360,7 @@ public void shouldNotLeakStatsToNextStatement() throws Throwable assertThat( connection, util.eventuallyReceives( msgSuccess(), msgRecord( eqRecord( equalTo( longValue( 1L ) ) ) ), - msgSuccess( allOf( typeMatcher, hasKey( "result_consumed_after" ) ) ) ) ); + msgSuccess( allOf( typeMatcher, hasKey( "t_last" ) ) ) ) ); } private byte[] bytes( int... ints ) diff --git a/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/ReadyStateIT.java b/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/ReadyStateIT.java index 787a121a10795..55c389799219e 100644 --- a/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/ReadyStateIT.java +++ b/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/ReadyStateIT.java @@ -74,8 +74,7 @@ void shouldMoveToStreamingOnRun_succ() throws Throwable RecordedBoltResponse response = recorder.nextResponse(); assertThat( response, succeeded() ); assertTrue( response.hasMetadata( "fields" ) ); - assertTrue( response.hasMetadata( "result_available_after") ); - assertTrue( response.hasMetadata( "tx_id") ); + assertTrue( response.hasMetadata( "t_first") ); assertThat( machine.state(), instanceOf( StreamingState.class ) ); } @@ -93,7 +92,6 @@ void shouldMoveToStreamingOnBegin_succ() throws Throwable // Then RecordedBoltResponse response = recorder.nextResponse(); assertThat( response, succeeded() ); - assertTrue( response.hasMetadata( "tx_id") ); assertThat( machine.state(), instanceOf( TransactionReadyState.class ) ); } diff --git a/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/StreamingStateIT.java b/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/StreamingStateIT.java index 4760a6cc7dc05..9f84122028e3e 100644 --- a/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/StreamingStateIT.java +++ b/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/StreamingStateIT.java @@ -74,7 +74,7 @@ void shouldMoveFromStreamingToReadyOnPullAll_succ() throws Throwable RecordedBoltResponse response = recorder.nextResponse(); assertThat( response, succeeded() ); assertTrue( response.hasMetadata( "type" ) ); - assertTrue( response.hasMetadata( "result_consumed_after" ) ); + assertTrue( response.hasMetadata( "t_last" ) ); assertTrue( response.hasMetadata( "bookmark" ) ); assertThat( machine.state(), instanceOf( ReadyState.class ) ); } diff --git a/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/TransactionReadyStateIT.java b/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/TransactionReadyStateIT.java index 966fd6226a8c9..3f2832f08d407 100644 --- a/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/TransactionReadyStateIT.java +++ b/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/TransactionReadyStateIT.java @@ -74,8 +74,7 @@ void shouldMoveToStreamingOnRun_succ() throws Throwable // Then RecordedBoltResponse response = recorder.nextResponse(); assertTrue( response.hasMetadata( "fields" ) ); - assertTrue( response.hasMetadata( "result_available_after" ) ); - assertFalse( response.hasMetadata( "tx_id" ) ); + assertTrue( response.hasMetadata( "t_first" ) ); assertThat( machine.state(), instanceOf( TransactionStreamingState.class ) ); } diff --git a/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/TransactionStreamingStateIT.java b/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/TransactionStreamingStateIT.java index 01433fa0c4b5d..fead84dbf0738 100644 --- a/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/TransactionStreamingStateIT.java +++ b/community/community-it/bolt-it/src/test/java/org/neo4j/bolt/v3/runtime/integration/TransactionStreamingStateIT.java @@ -92,7 +92,7 @@ void shouldMoveFromTxStreamingToTxReadyOnPullAll_succ() throws Throwable RecordedBoltResponse response = recorder.nextResponse(); assertThat( response, succeeded() ); assertTrue( response.hasMetadata( "type" ) ); - assertTrue( response.hasMetadata( "result_consumed_after" ) ); + assertTrue( response.hasMetadata( "t_last" ) ); assertFalse( response.hasMetadata( "bookmark" ) ); assertThat( machine.state(), instanceOf( TransactionReadyState.class ) ); }