diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java
deleted file mode 100644
index 1a16e4f4d3e1..000000000000
--- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright (c) 2002-2017 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-package org.neo4j.causalclustering.scenarios;
-
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.time.Clock;
-
-import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
-import org.neo4j.causalclustering.core.CoreGraphDatabase;
-import org.neo4j.causalclustering.discovery.Cluster;
-import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase;
-import org.neo4j.kernel.AvailabilityGuard;
-import org.neo4j.kernel.api.exceptions.TransactionFailureException;
-import org.neo4j.kernel.api.txtracking.TransactionIdTracker;
-import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
-import org.neo4j.kernel.internal.GraphDatabaseAPI;
-import org.neo4j.test.causalclustering.ClusterRule;
-
-import static java.time.Duration.ofSeconds;
-import static org.junit.Assert.fail;
-
-public class CausalConsistencyIT
-{
- @Rule
- public ClusterRule clusterRule = new ClusterRule( CausalConsistencyIT.class )
- .withNumberOfCoreMembers( 3 )
- .withNumberOfReadReplicas( 1 );
-
- @Test
- public void transactionsCommittedInTheCoreShouldAppearOnTheReadReplica() throws Exception
- {
- // given
- Cluster cluster = clusterRule.startCluster();
- cluster.coreTx( ( coreGraphDatabase, transaction ) -> {
- coreGraphDatabase.createNode();
- transaction.success();
- } );
-
- CoreGraphDatabase leaderDatabase = cluster.awaitLeader().database();
- long transactionVisibleOnLeader = transactionIdTracker( leaderDatabase ).newestEncounteredTxId();
-
- // then
- ReadReplicaGraphDatabase readReplicaGraphDatabase = cluster.findAnyReadReplica().database();
- transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) );
- }
-
- @Test
- public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() throws Throwable
- {
- // given
- Cluster cluster = clusterRule.startCluster();
-
- ReadReplicaGraphDatabase readReplicaGraphDatabase = cluster.findAnyReadReplica().database();
- CatchupPollingProcess pollingClient = readReplicaGraphDatabase.getDependencyResolver()
- .resolveDependency( CatchupPollingProcess.class );
- pollingClient.stop();
-
- cluster.coreTx( ( coreGraphDatabase, transaction ) -> {
- coreGraphDatabase.createNode();
- transaction.success();
- } );
-
- CoreGraphDatabase leaderDatabase = cluster.awaitLeader().database();
- long transactionVisibleOnLeader = transactionIdTracker( leaderDatabase ).newestEncounteredTxId();
-
- // when the poller is paused, transaction doesn't make it to the read replica
- try
- {
- transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) );
- fail( "should have thrown exception" );
- }
- catch ( TransactionFailureException e )
- {
- // expected timeout
- }
-
- // when the poller is resumed, it does make it to the read replica
- pollingClient.start();
- transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) );
- }
-
- private TransactionIdTracker transactionIdTracker( GraphDatabaseAPI database )
- {
- TransactionIdStore transactionIdStore =
- database.getDependencyResolver().resolveDependency( TransactionIdStore.class );
- AvailabilityGuard availabilityGuard =
- database.getDependencyResolver().resolveDependency( AvailabilityGuard.class );
- return new TransactionIdTracker( transactionIdStore, availabilityGuard, Clock.systemUTC() );
- }
-}
diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java
index cefdab022332..3e555d45624b 100644
--- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java
+++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java
@@ -26,6 +26,7 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Clock;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@@ -37,6 +38,7 @@
import java.util.function.BinaryOperator;
import java.util.function.Function;
+import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
import org.neo4j.causalclustering.catchup.tx.FileCopyMonitor;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.CoreGraphDatabase;
@@ -59,7 +61,10 @@
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
+import org.neo4j.kernel.AvailabilityGuard;
+import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
+import org.neo4j.kernel.api.txtracking.TransactionIdTracker;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.format.highlimit.HighLimit;
@@ -74,6 +79,7 @@
import org.neo4j.test.DbRepresentation;
import org.neo4j.test.causalclustering.ClusterRule;
+import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
@@ -101,8 +107,8 @@
public class ReadReplicaReplicationIT
{
// This test is extended in the blockdevice repository, and these constants are required there as well.
- public static final int NR_CORE_MEMBERS = 3;
- public static final int NR_READ_REPLICAS = 1;
+ private static final int NR_CORE_MEMBERS = 3;
+ private static final int NR_READ_REPLICAS = 1;
@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() ).withNumberOfCoreMembers( NR_CORE_MEMBERS )
@@ -171,10 +177,7 @@ public void shouldEventuallyPullTransactionDownToAllReadReplicas() throws Except
}
Set labelScanStoreFiles = new HashSet<>();
- cluster.coreTx( ( db, tx ) ->
- {
- gatherLabelScanStoreFiles( db, labelScanStoreFiles );
- } );
+ cluster.coreTx( ( db, tx ) -> gatherLabelScanStoreFiles( db, labelScanStoreFiles ) );
AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false );
Monitors monitors = new Monitors();
@@ -420,6 +423,50 @@ public void shouldBeAbleToPullTxAfterHavingDownloadedANewStoreAfterPruning() thr
equalTo( DbRepresentation.of( cluster.awaitLeader().database() ) ), 10, TimeUnit.SECONDS );
}
+ @Test
+ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() throws Throwable
+ {
+ // given
+ Cluster cluster = clusterRule.startCluster();
+
+ ReadReplicaGraphDatabase readReplicaGraphDatabase = cluster.findAnyReadReplica().database();
+ CatchupPollingProcess pollingClient = readReplicaGraphDatabase.getDependencyResolver()
+ .resolveDependency( CatchupPollingProcess.class );
+ pollingClient.stop();
+
+ cluster.coreTx( ( coreGraphDatabase, transaction ) -> {
+ coreGraphDatabase.createNode();
+ transaction.success();
+ } );
+
+ CoreGraphDatabase leaderDatabase = cluster.awaitLeader().database();
+ long transactionVisibleOnLeader = transactionIdTracker( leaderDatabase ).newestEncounteredTxId();
+
+ // when the poller is paused, transaction doesn't make it to the read replica
+ try
+ {
+ transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) );
+ fail( "should have thrown exception" );
+ }
+ catch ( TransactionFailureException e )
+ {
+ // expected timeout
+ }
+
+ // when the poller is resumed, it does make it to the read replica
+ pollingClient.start();
+ transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) );
+ }
+
+ private TransactionIdTracker transactionIdTracker( GraphDatabaseAPI database )
+ {
+ TransactionIdStore transactionIdStore =
+ database.getDependencyResolver().resolveDependency( TransactionIdStore.class );
+ AvailabilityGuard availabilityGuard =
+ database.getDependencyResolver().resolveDependency( AvailabilityGuard.class );
+ return new TransactionIdTracker( transactionIdStore, availabilityGuard, Clock.systemUTC() );
+ }
+
private PhysicalLogFiles physicalLogFiles( ClusterMember clusterMember )
{
return clusterMember.database().getDependencyResolver().resolveDependency( PhysicalLogFiles.class );