diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java index b718b2117942c..02dcee514d2a5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java @@ -75,7 +75,6 @@ public T makeBlockingRequest( MemberId upstream, CatchUpRequest request, Optional catchUpAddress = discoveryService.allServers().find( upstream ).map( CatchupServerAddress::getCatchupServer ); - CatchUpChannel channel = pool.acquire( catchUpAddress.orElseThrow( () -> new CatchUpClientException( "Cannot find the target member socket address" ) ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java index 7ee4f3dad74ca..9e577c1fc5520 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java @@ -27,7 +27,7 @@ public abstract class UpstreamDatabaseSelectionStrategy extends Service { - protected ReadReplicaTopologyService readReplicaTopologyService; + ReadReplicaTopologyService readReplicaTopologyService; public UpstreamDatabaseSelectionStrategy( String key, String... altKeys ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java index f2b56ec5d341c..3d926822e3b7b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java @@ -61,8 +61,8 @@ public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException if ( strategy.upstreamDatabase().isPresent() ) { result = strategy.upstreamDatabase().get(); + break; } - break; } catch ( NoSuchElementException ex ) { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java index 9a0c5651e9687..ab81100df8763 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java @@ -163,7 +163,6 @@ public String directURI() return String.format( "bolt://%s", boltAdvertisedAddress ); } - public File homeDir() { return neo4jHome; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java index 09156733047f9..211934bf37267 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java @@ -69,7 +69,7 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable { // given Cluster cluster = clusterRule.startCluster(); - int nodesBeforeReadReplicaStarts = 1; + int numberOfNodesToCreate = 100; cluster.coreTx( ( db, tx ) -> { @@ -77,14 +77,7 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable tx.success(); } ); - for ( int i = 0; i < 100; i++ ) - { - cluster.coreTx( ( db, tx ) -> - { - createData( db, nodesBeforeReadReplicaStarts ); - tx.success(); - } ); - } + storeSomeDataInCores( cluster, numberOfNodesToCreate ); AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false ); Monitors monitors = new Monitors(); @@ -103,22 +96,7 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable firstReadReplica.start(); - for ( final ReadReplica server : cluster.readReplicas() ) - { - GraphDatabaseService readReplica = server.database(); - try ( Transaction tx = readReplica.beginTx() ) - { - ThrowingSupplier nodeCount = () -> count( readReplica.getAllNodes() ); - assertEventually( "node to appear on read replica", nodeCount, is( 100L ), 1, MINUTES ); - - for ( Node node : readReplica.getAllNodes() ) - { - assertThat( node.getProperty( "foobar" ).toString(), startsWith( "baz_bat" ) ); - } - - tx.success(); - } - } + checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate ); for ( CoreClusterMember coreClusterMember : cluster.coreMembers() ) { @@ -134,13 +112,103 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable // then + checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate ); + } + + @Test + public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() throws Throwable + { + // given + Cluster cluster = clusterRule.startCluster(); + int numberOfNodes = 1; + int firstReadReplicaLocalMemberId = 101; + + cluster.coreTx( ( db, tx ) -> + { + db.schema().constraintFor( Label.label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create(); + tx.success(); + } ); + + storeSomeDataInCores( cluster, numberOfNodes ); + + AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false ); + Monitors monitors = new Monitors(); + + ReadReplica firstReadReplica = + cluster.addReadReplicaWithIdAndMonitors( firstReadReplicaLocalMemberId, monitors ); + File labelScanStore = LabelScanStoreProvider + .getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) ); + + monitors.addMonitorListener( (FileCopyMonitor) file -> + { + if ( file.getParent().contains( labelScanStore.getPath() ) ) + { + labelScanStoreCorrectlyPlaced.set( true ); + } + } ); + + firstReadReplica.start(); + + checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes ); + + upstreamFactory.setCurrent( firstReadReplica ); + + ReadReplica secondReadReplica = cluster.addReadReplicaWithId( 202 ); + secondReadReplica.setUpstreamDatabaseSelectionStrategy( "specific" ); + + secondReadReplica.start(); + + checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes ); + + firstReadReplica.shutdown(); + upstreamFactory.reset(); + + cluster.removeReadReplicaWithMemberId( firstReadReplicaLocalMemberId ); + + // when + // More transactions into core + storeSomeDataInCores( cluster, numberOfNodes ); + + // then + // reached second read replica from cores + checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes * 2 ); + } + + private void storeSomeDataInCores( Cluster cluster, int numberOfNodes ) throws Exception + { + for ( int i = 0; i < numberOfNodes; i++ ) + { + cluster.coreTx( ( db, tx ) -> + { + createData( db ); + tx.success(); + } ); + } + } + + private void createData( GraphDatabaseService db ) + { + Node node = db.createNode( Label.label( "Foo" ) ); + node.setProperty( "foobar", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar1", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar2", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar3", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar4", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar5", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar6", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar7", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar8", format( "baz_bat%s", UUID.randomUUID() ) ); + } + + private void checkDataHasReplicatedToReadReplicas( Cluster cluster, long numberOfNodes ) throws Exception + { for ( final ReadReplica server : cluster.readReplicas() ) { GraphDatabaseService readReplica = server.database(); try ( Transaction tx = readReplica.beginTx() ) { ThrowingSupplier nodeCount = () -> count( readReplica.getAllNodes() ); - assertEventually( "node to appear on read replica", nodeCount, is( 100L ), 1, MINUTES ); + assertEventually( "node to appear on read replica", nodeCount, is( numberOfNodes ), 1, MINUTES ); for ( Node node : readReplica.getAllNodes() ) { @@ -152,41 +220,29 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable } } - private void createData( GraphDatabaseService db, int amount ) - { - for ( int i = 0; i < amount; i++ ) - { - Node node = db.createNode( Label.label( "Foo" ) ); - node.setProperty( "foobar", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar1", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar2", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar3", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar4", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar5", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar6", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar7", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar8", format( "baz_bat%s", UUID.randomUUID() ) ); - } - } - @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) public static class SpecificReplicaStrategy extends UpstreamDatabaseSelectionStrategy { // This because we need a stable point for config to inject into Service loader loaded classes static final UpstreamFactory upstreamFactory = new UpstreamFactory(); - private ReadReplica upstream; - public SpecificReplicaStrategy() { super( "specific" ); - this.upstream = upstreamFactory.current(); } @Override public Optional upstreamDatabase() throws UpstreamDatabaseSelectionException { - return upstream.memberId(); + ReadReplica current = upstreamFactory.current(); + if ( current == null ) + { + return Optional.empty(); + } + else + { + return current.memberId(); + } } } @@ -203,5 +259,10 @@ public ReadReplica current() { return current; } + + void reset() + { + current = null; + } } }