From f8d8157169920f3ec74111b0d512502ee3cce613 Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Wed, 29 Mar 2017 09:20:26 +0100 Subject: [PATCH] Handle LocalDatabase restart breaking bolt connections due to TransactionIdStore not being refreshed * Introduce CANCELLED state so that polling process doesn't get restarted unless we explicitly want to restart it * Pass around a supplier of TransactionIdStore and resolve it in TransactionIdTracker * Bumped drivers to 1.2 --- .../bolt/v1/runtime/BoltFactoryImpl.java | 9 +- .../runtime/TransactionStateMachineSPI.java | 5 +- .../bolt/v1/runtime/BoltFactoryImplTest.java | 4 +- .../TransactionStateMachineSPITest.java | 21 ++- .../api/txtracking/TransactionIdTracker.java | 23 ++- .../txtracking/TransactionIdTrackerTest.java | 16 +- .../catchup/tx/CatchupPollingProcess.java | 9 +- .../scenarios/ReadReplicaReplicationIT.java | 5 +- .../java/org/neo4j/bolt/BoltFailuresIT.java | 42 +++-- .../scenarios/BoltCausalClusteringIT.java | 165 +++++++++++------- pom.xml | 2 +- 11 files changed, 199 insertions(+), 102 deletions(-) diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltFactoryImpl.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltFactoryImpl.java index b70b3dc55b896..f230f417fcdb3 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltFactoryImpl.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltFactoryImpl.java @@ -21,6 +21,7 @@ import java.time.Clock; import java.time.Duration; +import java.util.function.Supplier; import org.neo4j.bolt.security.auth.Authentication; import org.neo4j.graphdb.DependencyResolver; @@ -49,8 +50,8 @@ public class BoltFactoryImpl extends LifecycleAdapter implements BoltFactory private QueryExecutionEngine queryExecutionEngine; private GraphDatabaseQueryService queryService; - private TransactionIdStore transactionIdStore; private AvailabilityGuard availabilityGuard; + private DependencyResolver dependencyResolver; public BoltFactoryImpl( GraphDatabaseAPI gds, UsageData usageData, LogService logging, ThreadToStatementContextBridge txBridge, Authentication authentication, @@ -68,10 +69,9 @@ public BoltFactoryImpl( GraphDatabaseAPI gds, UsageData usageData, LogService lo @Override public void start() throws Throwable { - DependencyResolver dependencyResolver = gds.getDependencyResolver(); + dependencyResolver = gds.getDependencyResolver(); queryExecutionEngine = dependencyResolver.resolveDependency( QueryExecutionEngine.class ); queryService = dependencyResolver.resolveDependency( GraphDatabaseQueryService.class ); - transactionIdStore = dependencyResolver.resolveDependency( TransactionIdStore.class ); availabilityGuard = dependencyResolver.resolveDependency( AvailabilityGuard.class ); } @@ -80,7 +80,6 @@ public void stop() throws Throwable { queryExecutionEngine = null; queryService = null; - transactionIdStore = null; availabilityGuard = null; } @@ -98,7 +97,7 @@ private TransactionStateMachine.SPI createTxSpi( Clock clock ) long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout ); Duration txAwaitDuration = Duration.ofMillis( bookmarkReadyTimeout ); - return new TransactionStateMachineSPI( gds, txBridge, queryExecutionEngine, transactionIdStore, + return new TransactionStateMachineSPI( gds, txBridge, queryExecutionEngine, availabilityGuard, queryService, txAwaitDuration, clock ); } } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java index 7df835af96cf7..d4723a12891e9 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java @@ -22,6 +22,7 @@ import java.time.Clock; import java.time.Duration; import java.util.Map; +import java.util.function.Supplier; import org.neo4j.bolt.v1.runtime.TransactionStateMachine.BoltResultHandle; import org.neo4j.bolt.v1.runtime.spi.BoltResult; @@ -64,7 +65,6 @@ class TransactionStateMachineSPI implements TransactionStateMachine.SPI TransactionStateMachineSPI( GraphDatabaseAPI db, ThreadToStatementContextBridge txBridge, QueryExecutionEngine queryExecutionEngine, - TransactionIdStore transactionIdStoreSupplier, AvailabilityGuard availabilityGuard, GraphDatabaseQueryService queryService, Duration txAwaitDuration, @@ -73,7 +73,10 @@ class TransactionStateMachineSPI implements TransactionStateMachine.SPI this.db = db; this.txBridge = txBridge; this.queryExecutionEngine = queryExecutionEngine; + + Supplier transactionIdStoreSupplier = db.getDependencyResolver().provideDependency( TransactionIdStore.class ); this.transactionIdTracker = new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard, clock ); + this.contextFactory = Neo4jTransactionalContextFactory.create( queryService, locker ); this.queryService = queryService; this.txAwaitDuration = txAwaitDuration; diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltFactoryImplTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltFactoryImplTest.java index f5fa7886f4372..732b63f66c099 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltFactoryImplTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltFactoryImplTest.java @@ -98,8 +98,8 @@ public void txIdStoreRefreshedAfterRestart() throws Throwable when( txIdStoreBeforeRestart.getLastClosedTransactionId() ).thenReturn( 42L ); TransactionIdStore txIdStoreAfterRestart = mock( TransactionIdStore.class ); when( txIdStoreAfterRestart.getLastClosedTransactionId() ).thenReturn( 4242L ); - when( dependencyResolver.resolveDependency( TransactionIdStore.class ) ) - .thenReturn( txIdStoreBeforeRestart ).thenReturn( txIdStoreAfterRestart ); + when( dependencyResolver.provideDependency( TransactionIdStore.class ) ) + .thenReturn( () -> txIdStoreBeforeRestart ).thenReturn( () -> txIdStoreAfterRestart ); BoltFactoryImpl boltFactory = newBoltFactory( db ); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPITest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPITest.java index 4395ea7141a72..c9b511c3684f5 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPITest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPITest.java @@ -26,6 +26,7 @@ import java.time.Duration; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.function.Supplier; import org.neo4j.graphdb.DependencyResolver; import org.neo4j.kernel.AvailabilityGuard; @@ -56,7 +57,7 @@ public class TransactionStateMachineSPITest public void throwsWhenTxAwaitDurationExpires() throws Exception { long lastClosedTransactionId = 100; - TransactionIdStore txIdStore = fixedTxIdStore( lastClosedTransactionId ); + Supplier txIdStore = () -> fixedTxIdStore( lastClosedTransactionId ); Duration txAwaitDuration = Duration.ofSeconds( 42 ); FakeClock clock = new FakeClock(); @@ -93,7 +94,7 @@ public void throwsWhenTxAwaitDurationExpires() throws Exception public void doesNotWaitWhenTxIdUpToDate() throws Exception { long lastClosedTransactionId = 100; - TransactionIdStore txIdStore = fixedTxIdStore( lastClosedTransactionId ); + Supplier txIdStore = () -> fixedTxIdStore( lastClosedTransactionId ); TransactionStateMachineSPI txSpi = createTxSpi( txIdStore, Duration.ZERO, Clock.systemUTC() ); @@ -113,21 +114,27 @@ private static TransactionIdStore fixedTxIdStore( long lastClosedTransactionId ) return txIdStore; } - private static TransactionStateMachineSPI createTxSpi( TransactionIdStore txIdStore, Duration txAwaitDuration, + private static TransactionStateMachineSPI createTxSpi( Supplier txIdStore, Duration txAwaitDuration, Clock clock ) { AvailabilityGuard availabilityGuard = new AvailabilityGuard( clock, NullLog.getInstance() ); return createTxSpi( txIdStore, txAwaitDuration, availabilityGuard, clock ); } - private static TransactionStateMachineSPI createTxSpi( TransactionIdStore txIdStore, Duration txAwaitDuration, + private static TransactionStateMachineSPI createTxSpi( Supplier txIdStore, Duration txAwaitDuration, AvailabilityGuard availabilityGuard, Clock clock ) { GraphDatabaseQueryService queryService = mock( GraphDatabaseQueryService.class ); - when( queryService.getDependencyResolver() ).thenReturn( mock( DependencyResolver.class ) ); + DependencyResolver dependencyResolver = mock( DependencyResolver.class ); + GraphDatabaseAPI db = mock( GraphDatabaseAPI.class ); - return new TransactionStateMachineSPI( mock( GraphDatabaseAPI.class ), new ThreadToStatementContextBridge(), - mock( QueryExecutionEngine.class ), txIdStore, availabilityGuard, queryService, txAwaitDuration, + when( queryService.getDependencyResolver() ).thenReturn( dependencyResolver ); + when( db.getDependencyResolver() ).thenReturn( dependencyResolver ); + + when(dependencyResolver.provideDependency( TransactionIdStore.class )).thenReturn( txIdStore ); + + return new TransactionStateMachineSPI( db, new ThreadToStatementContextBridge(), + mock( QueryExecutionEngine.class ), availabilityGuard, queryService, txAwaitDuration, clock ); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/txtracking/TransactionIdTracker.java b/community/kernel/src/main/java/org/neo4j/kernel/api/txtracking/TransactionIdTracker.java index cbeb366dc4682..99385ffc97118 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/txtracking/TransactionIdTracker.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/txtracking/TransactionIdTracker.java @@ -22,6 +22,7 @@ import java.time.Clock; import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.api.exceptions.Status; @@ -40,15 +41,15 @@ public class TransactionIdTracker private static final int POLL_INTERVAL = 25; private static final TimeUnit POLL_UNIT = TimeUnit.MILLISECONDS; - private final TransactionIdStore transactionIdStore; + private final Supplier transactionIdStoreSupplier; private final AvailabilityGuard availabilityGuard; private final Clock clock; - public TransactionIdTracker( TransactionIdStore transactionIdStore, AvailabilityGuard availabilityGuard, - Clock clock ) + public TransactionIdTracker( Supplier transactionIdStoreSupplier, AvailabilityGuard availabilityGuard, + Clock clock ) { this.availabilityGuard = availabilityGuard; - this.transactionIdStore = transactionIdStore; + this.transactionIdStoreSupplier = transactionIdStoreSupplier; this.clock = clock; } @@ -86,7 +87,7 @@ public void awaitUpToDate( long oldestAcceptableTxId, Duration timeout ) throws { throw new TransactionFailureException( Status.Transaction.InstanceStateChanged, "Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId, - transactionIdStore.getLastClosedTransactionId() ); + transactionIdStore().getLastClosedTransactionId() ); } } @@ -97,7 +98,15 @@ private boolean isReady( long oldestAcceptableTxId ) throws TransactionFailureEx throw new TransactionFailureException( Status.General.DatabaseUnavailable, "Database had become unavailable while waiting for requested version %d.", oldestAcceptableTxId ); } - return oldestAcceptableTxId <= transactionIdStore.getLastClosedTransactionId(); + return oldestAcceptableTxId <= transactionIdStore().getLastClosedTransactionId(); + } + + private TransactionIdStore transactionIdStore() + { + // We need to resolve this as late as possible in case the database has been restarted as part of store copy. + // This causes TransactionIdStore staleness and we could get a MetaDataStore closed exception. + // Ideally we'd fix this with some life cycle wizardry but not going to do that for now. + return transactionIdStoreSupplier.get(); } /** @@ -109,6 +118,6 @@ private boolean isReady( long oldestAcceptableTxId ) throws TransactionFailureEx */ public long newestEncounteredTxId() { - return transactionIdStore.getLastClosedTransactionId(); + return transactionIdStore().getLastClosedTransactionId(); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/txtracking/TransactionIdTrackerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/api/txtracking/TransactionIdTrackerTest.java index a40e822671c15..53ec2bb3501a2 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/txtracking/TransactionIdTrackerTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/txtracking/TransactionIdTrackerTest.java @@ -22,6 +22,7 @@ import org.junit.Test; import java.time.Clock; +import java.util.function.Supplier; import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.api.exceptions.Status; @@ -38,13 +39,15 @@ public class TransactionIdTrackerTest { - private final TransactionIdStore transactionIdStore = mock( TransactionIdStore.class ); + private final Supplier transactionIdStoreSupplier = mock( Supplier.class ); private final AvailabilityGuard availabilityGuard = mock( AvailabilityGuard.class ); @Test( timeout = 500 ) public void shouldAlwaysReturnIfTheRequestVersionIsBaseTxIdOrLess() throws Exception { // given + TransactionIdStore transactionIdStore = mock( TransactionIdStore.class ); + when(transactionIdStoreSupplier.get()).thenReturn( transactionIdStore ); when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( -1L ); when( availabilityGuard.isAvailable() ).thenReturn( true ); TransactionIdTracker transactionIdTracker = createTracker(); @@ -60,7 +63,10 @@ public void shouldReturnIfTheVersionIsUpToDate() throws Exception { // given long version = 5L; - when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version ); + TransactionIdStore transactionIdStore = mock(TransactionIdStore.class); + when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore ); + when( transactionIdStore.getLastClosedTransactionId()).thenReturn( version ); + when( availabilityGuard.isAvailable() ).thenReturn( true ); TransactionIdTracker transactionIdTracker = createTracker(); @@ -75,6 +81,8 @@ public void shouldTimeoutIfTheVersionIsTooHigh() throws Exception { // given long version = 5L; + TransactionIdStore transactionIdStore = mock( TransactionIdStore.class ); + when(transactionIdStoreSupplier.get()).thenReturn( transactionIdStore ); when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version ); when( availabilityGuard.isAvailable() ).thenReturn( true ); TransactionIdTracker transactionIdTracker = createTracker(); @@ -97,6 +105,8 @@ public void shouldGiveUpWaitingIfTheDatabaseIsUnavailable() throws Exception { // given long version = 5L; + TransactionIdStore transactionIdStore = mock( TransactionIdStore.class ); + when(transactionIdStoreSupplier.get()).thenReturn( transactionIdStore ); when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version ); when( availabilityGuard.isAvailable() ).thenReturn( false ); TransactionIdTracker transactionIdTracker = createTracker(); @@ -116,6 +126,6 @@ public void shouldGiveUpWaitingIfTheDatabaseIsUnavailable() throws Exception private TransactionIdTracker createTracker() { - return new TransactionIdTracker( transactionIdStore, availabilityGuard, Clock.systemUTC() ); + return new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard, Clock.systemUTC() ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java index d46e1fcb7c047..a51b8cf1d2097 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java @@ -47,6 +47,8 @@ import org.neo4j.logging.LogProvider; import static java.lang.String.format; + +import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.CANCELLED; import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.PANIC; import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.STORE_COPYING; import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.TX_PULLING; @@ -71,7 +73,8 @@ enum State { TX_PULLING, STORE_COPYING, - PANIC + PANIC, + CANCELLED } private final LocalDatabase localDatabase; @@ -115,6 +118,7 @@ public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDataba @Override public synchronized void start() throws Throwable { + state = TX_PULLING; timeout = timeoutService.create( TX_PULLER_TIMEOUT, txPullIntervalMillis, 0, timeout -> onTimeout() ); dbHealth = databaseHealthSupplier.get(); upToDateFuture = new CompletableFuture<>(); @@ -128,6 +132,7 @@ public Future upToDateFuture() throws InterruptedException @Override public void stop() throws Throwable { + state = CANCELLED; timeout.cancel(); } @@ -162,7 +167,7 @@ private void onTimeout() panic( e ); } - if ( state != PANIC ) + if ( state != PANIC && state != CANCELLED ) { timeout.renew(); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java index 6975a2dc48b55..89e3d0e5839a5 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java @@ -37,6 +37,7 @@ import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.Supplier; import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess; import org.neo4j.causalclustering.catchup.tx.FileCopyMonitor; @@ -462,8 +463,8 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th private TransactionIdTracker transactionIdTracker( GraphDatabaseAPI database ) { - TransactionIdStore transactionIdStore = - database.getDependencyResolver().resolveDependency( TransactionIdStore.class ); + Supplier transactionIdStore = + database.getDependencyResolver().provideDependency( TransactionIdStore.class ); AvailabilityGuard availabilityGuard = database.getDependencyResolver().resolveDependency( AvailabilityGuard.class ); return new TransactionIdTracker( transactionIdStore, availabilityGuard, Clock.systemUTC() ); diff --git a/integrationtests/src/test/java/org/neo4j/bolt/BoltFailuresIT.java b/integrationtests/src/test/java/org/neo4j/bolt/BoltFailuresIT.java index 891a9b2768b95..b19dedb4b8053 100644 --- a/integrationtests/src/test/java/org/neo4j/bolt/BoltFailuresIT.java +++ b/integrationtests/src/test/java/org/neo4j/bolt/BoltFailuresIT.java @@ -32,7 +32,8 @@ import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Session; -import org.neo4j.driver.v1.exceptions.ConnectionFailureException; +import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.factory.GraphDatabaseFactory; import org.neo4j.graphdb.factory.GraphDatabaseSettings; @@ -64,6 +65,7 @@ public class BoltFailuresIT private GraphDatabaseService db; private Driver driver; + private Session session; @After public void shutdownDb() @@ -72,7 +74,7 @@ public void shutdownDb() { db.shutdown(); } - IOUtils.closeAllSilently( driver ); + IOUtils.closeAllSilently( session, driver ); } @Test( timeout = TEST_TIMEOUT ) @@ -85,15 +87,19 @@ public void throwsWhenWorkerCreationFails() db = startDbWithBolt( new GraphDatabaseFactoryWithCustomBoltKernelExtension( extension ) ); driver = createDriver(); + // creating a session does not force driver to open a new connection, it opens connections + // lazily either when transaction is started or when query is executed via #run() + session = driver.session(); try { - driver.session(); + // attempt to begin a transaction to make driver create new socket connection + session.beginTransaction(); fail( "Exception expected" ); } catch ( Exception e ) { - assertThat( e, instanceOf( ConnectionFailureException.class ) ); + assertThat( e, instanceOf( ServiceUnavailableException.class ) ); } } @@ -106,15 +112,19 @@ public void throwsWhenMonitoredWorkerCreationFails() db = startDbWithBolt( new GraphDatabaseFactory().setMonitors( monitors ) ); driver = createDriver(); + // creating a session does not force driver to open a new connection, it opens connections + // lazily either when transaction is started or when query is executed via #run() + session = driver.session(); try { - driver.session(); + // attempt to begin a transaction to make driver create new socket connection + session.beginTransaction(); fail( "Exception expected" ); } catch ( Exception e ) { - assertThat( e, instanceOf( ConnectionFailureException.class ) ); + assertThat( e, instanceOf( ServiceUnavailableException.class ) ); } } @@ -155,7 +165,7 @@ public void throwsWhenRunMessageProcessingFailsToComplete() } private void throwsWhenInitMessageFails( Consumer monitorSetup, - boolean shouldBeAbleToGetSession ) + boolean shouldBeAbleToBeginTransaction ) { ThrowingSessionMonitor sessionMonitor = new ThrowingSessionMonitor(); monitorSetup.accept( sessionMonitor ); @@ -164,11 +174,12 @@ private void throwsWhenInitMessageFails( Consumer monito db = startTestDb( monitors ); driver = createDriver(); - try ( Session session = driver.session() ) + try ( Session session = driver.session(); + Transaction tx = session.beginTransaction() ) { - if ( shouldBeAbleToGetSession ) + if ( shouldBeAbleToBeginTransaction ) { - session.run( "CREATE ()" ).consume(); + tx.run( "CREATE ()" ).consume(); } else { @@ -177,7 +188,7 @@ private void throwsWhenInitMessageFails( Consumer monito } catch ( Exception e ) { - assertThat( e, instanceOf( ConnectionFailureException.class ) ); + assertThat( e, instanceOf( ServiceUnavailableException.class ) ); } } @@ -189,19 +200,24 @@ private void throwsWhenRunMessageFails( Consumer monitor db = startTestDb( monitors ); driver = createDriver(); + // open a session and start a transaction, this will force driver to obtain + // a network connection and bind it to the transaction Session session = driver.session(); + Transaction tx = session.beginTransaction(); + // at this point driver holds a valid initialize connection // setup monitor to throw before running the query to make processing of the RUN message fail monitorSetup.accept( sessionMonitor ); - session.run( "CREATE ()" ); + tx.run( "CREATE ()" ); try { + tx.close(); session.close(); fail( "Exception expected" ); } catch ( Exception e ) { - assertThat( e, instanceOf( ConnectionFailureException.class ) ); + assertThat( e, instanceOf( ServiceUnavailableException.class ) ); } } diff --git a/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/BoltCausalClusteringIT.java b/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/BoltCausalClusteringIT.java index 79a6a00436be1..f40ddb96fe09c 100644 --- a/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/BoltCausalClusteringIT.java +++ b/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/BoltCausalClusteringIT.java @@ -19,27 +19,26 @@ */ package org.neo4j.causalclustering.scenarios; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - import java.io.File; import java.io.IOException; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.logging.Level; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess; import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.consensus.roles.Role; import org.neo4j.causalclustering.discovery.Cluster; -import org.neo4j.causalclustering.discovery.ClusterMember; import org.neo4j.causalclustering.discovery.CoreClusterMember; import org.neo4j.causalclustering.discovery.ReadReplica; -import org.neo4j.driver.internal.RoutingNetworkSession; import org.neo4j.driver.internal.logging.JULogging; -import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.AuthTokens; import org.neo4j.driver.v1.Config; @@ -47,26 +46,34 @@ import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.Values; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.DatabaseException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; -import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.driver.v1.exceptions.TransientException; +import org.neo4j.driver.v1.summary.ServerInfo; +import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.helpers.collection.Iterators; import org.neo4j.io.fs.FileUtils; import org.neo4j.test.causalclustering.ClusterRule; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.hamcrest.CoreMatchers.containsString; + import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.startsWith; +import static org.hamcrest.Matchers.stringContainsInOrder; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import static org.neo4j.driver.v1.Values.parameters; import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.test.assertion.Assert.assertEventually; @@ -287,7 +294,7 @@ public void shouldPickANewServerToWriteToOnLeaderSwitch() throws Throwable LeaderSwitcher leaderSwitcher = new LeaderSwitcher( cluster ); Config config = Config.build().withLogging( new JULogging( Level.OFF ) ).toConfig(); - Set seenAddresses = new HashSet<>(); + Set seenAddresses = new HashSet<>(); try ( Driver driver = GraphDatabase .driver( leader.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ), config ) ) { @@ -304,10 +311,9 @@ public void shouldPickANewServerToWriteToOnLeaderSwitch() throws Throwable try ( Session session = driver.session( AccessMode.WRITE ) ) { - BoltServerAddress boltServerAddress = ((RoutingNetworkSession) session).address(); - - session.run( "CREATE (p:Person)" ); - seenAddresses.add( boltServerAddress ); + StatementResult result = session.run( "CREATE (p:Person)" ); + ServerInfo server = result.summary().server(); + seenAddresses.add( server.address()); success = seenAddresses.size() >= 2; } catch ( Exception e ) @@ -348,43 +354,7 @@ public void sessionCreationShouldFailIfCallingDiscoveryProcedureOnEdgeServer() t catch ( ServiceUnavailableException ex ) { // then - assertEquals( format( "Server %s couldn't perform discovery", readReplica.boltAdvertisedAddress() ), - ex.getMessage() ); - } - } - - @Test - public void sessionShouldExpireOnFailingReadQuery() throws Exception - { - // given - cluster = clusterRule.withNumberOfReadReplicas( 1 ).startCluster(); - CoreClusterMember coreServer = cluster.getCoreMemberById( 0 ); - - Driver driver = GraphDatabase.driver( coreServer.routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) ); - - inExpirableSession( driver, Driver::session, ( session ) -> - { - session.run( "CREATE (p:Person {name: {name} })", Values.parameters( "name", "Jim" ) ); - return null; - } ); - - try ( Session readSession = driver.session( AccessMode.READ ) ) - { - // when - connectedServer( readSession ).shutdown(); - - // then - readSession.run( "MATCH (n) RETURN n LIMIT 1" ).consume(); - fail( "Should have thrown an exception as the read replica went away mid query" ); - } - catch ( SessionExpiredException sep ) - { - // then - assertThat( sep.getMessage(), containsString( "is no longer available" ) ); - } - finally - { - driver.close(); + assertThat(ex.getMessage(), startsWith( "Failed to call 'dbms.cluster.routing.getServers' procedure on server" )); } } @@ -608,11 +578,25 @@ public void shouldSendRequestsToNewlyAddedReadReplicas() throws Throwable { for ( int i = 0; i < cluster.readReplicas().size(); i++ ) // don't care about cores { - try ( Session session = driver.session( AccessMode.READ ) ) + try ( Session session = driver.session( AccessMode.READ, bookmark ) ) { - BoltServerAddress boltServerAddress = ((RoutingNetworkSession) session).address(); executeReadQuery( bookmark, session ); - readReplicas.remove( boltServerAddress.toString() ); + + session.readTransaction( new TransactionWork() + { + @Override + public Void execute( Transaction tx ) + { + StatementResult result = tx.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ); + + assertEquals( 1, result.next().get( "count" ).asInt() ); + + readReplicas.remove( result.summary().server().address() ); + + return null; + } + } ); + } catch ( Throwable throwable ) { @@ -677,7 +661,70 @@ public void shouldHandleLeaderSwitch() throws Exception } } } + } + + @Test + public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() throws Throwable + { + // given + Map params = stringMap( GraphDatabaseSettings.keep_logical_logs.name(), "keep_none", + GraphDatabaseSettings.logical_log_rotation_threshold.name(), "1M", + GraphDatabaseSettings.check_point_interval_time.name(), "100ms", + CausalClusteringSettings.cluster_allow_reads_on_followers.name(), "false"); + + Cluster cluster = clusterRule.withSharedCoreParams( params ).withNumberOfReadReplicas( 1 ).startCluster(); + + Driver driver = GraphDatabase.driver( cluster.awaitLeader().routingURI(), AuthTokens.basic( "neo4j", "neo4j" ) ); + + try(Session session = driver.session()) + { + session.writeTransaction( ( tx ) -> + { + tx.run( "MERGE (n:Person {name: 'Jim'})" ); + return null; + } ); + } + + ReadReplica replica = cluster.findAnyReadReplica(); + + CatchupPollingProcess pollingClient = replica.database().getDependencyResolver() + .resolveDependency( CatchupPollingProcess.class ); + + pollingClient.stop(); + + String lastBookmark = null; + for ( int i = 0; i < 5; i++ ) + { + try(Session writeSession = driver.session()) + { + writeSession.writeTransaction( tx -> + { + tx.run( "UNWIND range(0, 20000) AS i CREATE (:Person)" ); + return null; + } ); + + lastBookmark = writeSession.lastBookmark(); + } + } + // when the poller is resumed, it does make it to the read replica + pollingClient.start(); + + int happyCount = 0; + int numberOfRequests = 1_000; + for ( int i = 0; i < numberOfRequests; i++ ) // don't care about cores + { + try ( Session session = driver.session( lastBookmark ) ) + { + happyCount += session.readTransaction( tx -> + { + tx.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ); + return 1; + } ); + } + } + + assertEquals( numberOfRequests, happyCount ); } private void executeReadQuery( String bookmark, Session session ) @@ -710,11 +757,11 @@ private T inExpirableSession( Driver driver, Function acquir throw new TimeoutException( "Transaction did not succeed in time" ); } - private ClusterMember connectedServer( Session session ) throws NoSuchFieldException, IllegalAccessException - { - BoltServerAddress address = ((RoutingNetworkSession) session).address(); - return cluster.getMemberByBoltAddress( new AdvertisedSocketAddress( address.host(), address.port() ) ); - } +// private ClusterMember connectedServer( Session session ) throws NoSuchFieldException, IllegalAccessException +// { +// BoltServerAddress address = ((RoutingNetworkSession) session).address(); +// return cluster.getMemberByBoltAddress( new AdvertisedSocketAddress( address.host(), address.port() ) ); +// } private void switchLeader( CoreClusterMember initialLeader ) throws InterruptedException { diff --git a/pom.xml b/pom.xml index 24b5021ce4a2b..ca2711b1da070 100644 --- a/pom.xml +++ b/pom.xml @@ -451,7 +451,7 @@ org.neo4j.driver neo4j-java-driver - 1.1.0-M06 + 1.2.1 test