diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java index c749dbeb3a005..2f9bbc9e84a61 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java @@ -37,7 +37,7 @@ public ClusterTopology( CoreTopology coreTopology, ReadReplicaTopology readRepli public Optional find( MemberId upstream ) { Optional coreAddresses = coreTopology.find( upstream ); - Optional readReplicaAddresses = readReplicaTopology.find( upstream ); + Optional readReplicaAddresses = readReplicaTopology.findAddressFor( upstream ); if ( coreAddresses.isPresent() ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java index 23e2b29043b87..610f941efa6b6 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java @@ -100,14 +100,7 @@ private Difference asDifference( CoreTopology topology, MemberId memberId ) public Optional anyCoreMemberId() { - if ( coreMembers.keySet().size() == 0 ) - { - return Optional.empty(); - } - else - { return coreMembers.keySet().stream().findAny(); - } } class TopologyDifference diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java index 7ab0497b598ff..0d9c8e7e6720e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java @@ -41,12 +41,12 @@ public ReadReplicaTopology( Map readReplicaMember this.readReplicaMembers = readReplicaMembers; } - public Collection members() + public Collection addresses() { return readReplicaMembers.values(); } - public Optional find( MemberId memberId ) + Optional findAddressFor( MemberId memberId ) { return Optional.ofNullable( readReplicaMembers.get( memberId ) ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java index 893925bd1c0d0..c3f74fabf0ea4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java @@ -99,7 +99,7 @@ public RawIterator apply( Context ctx, Object[] inp log.debug( "No Address found for " + memberId ); } } - for ( ReadReplicaAddresses readReplicaAddresses : discoveryService.readReplicas().members() ) + for ( ReadReplicaAddresses readReplicaAddresses : discoveryService.readReplicas().addresses() ) { endpoints.add( new ReadWriteEndPoint( readReplicaAddresses.connectors(), Role.READ_REPLICA ) ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java index 84dda0ecd2f67..40f7a253ade74 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java @@ -38,7 +38,7 @@ public MemberId( UUID uuid ) { Objects.requireNonNull( uuid ); this.uuid = uuid; - shortName = uuid.toString();//.substring( 0, 8 ); + shortName = uuid.toString().substring( 0, 8 ); } public UUID getUuid() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java index b7287df046f94..6e2a4166b69d4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java @@ -130,7 +130,7 @@ private List writeEndpoints() private List readEndpoints() { - List readReplicas = discoveryService.readReplicas().members().stream() + List readReplicas = discoveryService.readReplicas().addresses().stream() .map( extractBoltAddress() ).collect( toList() ); boolean addFollowers = readReplicas.isEmpty() || config.get( cluster_allow_reads_on_followers ); Stream readCore = addFollowers ? coreReadEndPoints() : Stream.empty(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java index 4bb6b23a664d6..f88f5b3d0acaa 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java @@ -94,7 +94,7 @@ private List writeEndpoints( CoreTopology cores ) private List readEndpoints( CoreTopology cores, ReadReplicaTopology readers ) { - return concat( readers.members().stream(), cores.addresses().stream() ) + return concat( readers.addresses().stream(), cores.addresses().stream() ) .map( extractBoltAddress() ) .map( Endpoint::read ) .collect( Collectors.toList() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java index e3b32cb75a0d6..93008bc90101a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java @@ -55,21 +55,13 @@ private static class ModuloCounter ModuloCounter( int modulo ) { // e.g. every 10th means 0-9 - this.modulo = modulo -1; + this.modulo = modulo - 1; } boolean shouldReturnCoreMemberId() { - if ( counter == modulo ) - { - counter = 0; - return true; - } - else - { - counter++; - return false; - } + counter = (counter + 1) % modulo; + return counter == 0; } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java index 3d926822e3b7b..5be7b0d040906 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java @@ -54,7 +54,7 @@ public class UpstreamDatabaseStrategySelector public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException { MemberId result = null; - for ( UpstreamDatabaseSelectionStrategy strategy : this.strategies ) + for ( UpstreamDatabaseSelectionStrategy strategy : strategies ) { try { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/backup/ClusterSeedingIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/backup/ClusterSeedingIT.java index 0a987220e472a..74f1ab6517253 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/backup/ClusterSeedingIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/backup/ClusterSeedingIT.java @@ -46,7 +46,7 @@ import static org.neo4j.backup.OnlineBackupCommandIT.runBackupToolFromOtherJvmToGetExitCode; import static org.neo4j.causalclustering.backup.BackupCoreIT.backupAddress; import static org.neo4j.causalclustering.discovery.Cluster.dataMatchesEventually; -import static org.neo4j.causalclustering.helpers.DataCreator.createNodes; +import static org.neo4j.causalclustering.helpers.DataCreator.createEmptyNodes; public class ClusterSeedingIT { @@ -156,7 +156,7 @@ public void shouldSeedNewMemberFromNonEmptyIdleCluster() throws Throwable cluster = new Cluster( testDir.directory( "cluster-b" ), 3, 0, new SharedDiscoveryService(), emptyMap(), backupParams(), emptyMap(), emptyMap(), Standard.LATEST_NAME ); cluster.start(); - createNodes( cluster, 100 ); + createEmptyNodes( cluster, 100 ); // when: creating a backup File backupDir = createBackup( cluster.getCoreMemberById( 0 ).database(), "the-backup" ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helpers/DataCreator.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helpers/DataCreator.java index bc8cdd77ede81..03deb5d2720b9 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helpers/DataCreator.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helpers/DataCreator.java @@ -19,12 +19,33 @@ */ package org.neo4j.causalclustering.helpers; +import java.util.function.Supplier; + import org.neo4j.causalclustering.discovery.Cluster; import org.neo4j.causalclustering.discovery.CoreClusterMember; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; +import org.neo4j.helpers.collection.Pair; public class DataCreator { - public static CoreClusterMember createNodes( Cluster cluster, int numberOfNodes ) throws Exception + public static CoreClusterMember createLabelledNodesWithProperty( Cluster cluster, int numberOfNodes, + Label label, Supplier> supplier ) throws Exception + { + CoreClusterMember last = null; + for ( int i = 0; i < numberOfNodes; i++ ) + { + last = cluster.coreTx( ( db, tx ) -> + { + Node node = db.createNode( label ); + node.setProperty( supplier.get().first(), supplier.get().other() ); + tx.success(); + } ); + } + return last; + } + + public static CoreClusterMember createEmptyNodes( Cluster cluster, int numberOfNodes ) throws Exception { CoreClusterMember last = null; for ( int i = 0; i < numberOfNodes; i++ ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java index 211934bf37267..eb3bdbcf7eb62 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java @@ -38,21 +38,22 @@ import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy; import org.neo4j.function.ThrowingSupplier; import org.neo4j.graphdb.GraphDatabaseService; -import org.neo4j.graphdb.Label; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Transaction; import org.neo4j.helpers.Service; +import org.neo4j.helpers.collection.Pair; import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.test.causalclustering.ClusterRule; -import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MINUTES; import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +import static org.neo4j.causalclustering.helpers.DataCreator.createLabelledNodesWithProperty; import static org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT.SpecificReplicaStrategy.upstreamFactory; import static org.neo4j.com.storecopy.StoreUtil.TEMP_COPY_DIRECTORY_NAME; +import static org.neo4j.graphdb.Label.label; import static org.neo4j.helpers.collection.Iterables.count; import static org.neo4j.test.assertion.Assert.assertEventually; @@ -73,27 +74,18 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable cluster.coreTx( ( db, tx ) -> { - db.schema().constraintFor( Label.label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create(); + db.schema().constraintFor( label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create(); tx.success(); } ); - storeSomeDataInCores( cluster, numberOfNodesToCreate ); + createLabelledNodesWithProperty( cluster, numberOfNodesToCreate, label( "Foo" ), + () -> Pair.of( "foobar", String.format( "baz_bat%s", UUID.randomUUID() ) ) ); - AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false ); - Monitors monitors = new Monitors(); - ReadReplica firstReadReplica = cluster.addReadReplicaWithIdAndMonitors( 101, monitors ); + ReadReplica firstReadReplica = cluster.addReadReplicaWithIdAndMonitors( 101, new Monitors() ); File labelScanStore = LabelScanStoreProvider .getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) ); - monitors.addMonitorListener( (FileCopyMonitor) file -> - { - if ( file.getParent().contains( labelScanStore.getPath() ) ) - { - labelScanStoreCorrectlyPlaced.set( true ); - } - } ); - firstReadReplica.start(); checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate ); @@ -125,28 +117,18 @@ public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() thro cluster.coreTx( ( db, tx ) -> { - db.schema().constraintFor( Label.label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create(); + db.schema().constraintFor( label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create(); tx.success(); } ); - storeSomeDataInCores( cluster, numberOfNodes ); - - AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false ); - Monitors monitors = new Monitors(); + createLabelledNodesWithProperty( cluster, numberOfNodes, label( "Foo" ), + () -> Pair.of( "foobar", String.format( "baz_bat%s", UUID.randomUUID() ) ) ); ReadReplica firstReadReplica = - cluster.addReadReplicaWithIdAndMonitors( firstReadReplicaLocalMemberId, monitors ); + cluster.addReadReplicaWithIdAndMonitors( firstReadReplicaLocalMemberId, new Monitors() ); File labelScanStore = LabelScanStoreProvider .getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) ); - monitors.addMonitorListener( (FileCopyMonitor) file -> - { - if ( file.getParent().contains( labelScanStore.getPath() ) ) - { - labelScanStoreCorrectlyPlaced.set( true ); - } - } ); - firstReadReplica.start(); checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes ); @@ -167,39 +149,14 @@ public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() thro // when // More transactions into core - storeSomeDataInCores( cluster, numberOfNodes ); + createLabelledNodesWithProperty( cluster, numberOfNodes, label( "Foo" ), + () -> Pair.of( "foobar", String.format( "baz_bat%s", UUID.randomUUID() ) ) ); // then // reached second read replica from cores checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes * 2 ); } - private void storeSomeDataInCores( Cluster cluster, int numberOfNodes ) throws Exception - { - for ( int i = 0; i < numberOfNodes; i++ ) - { - cluster.coreTx( ( db, tx ) -> - { - createData( db ); - tx.success(); - } ); - } - } - - private void createData( GraphDatabaseService db ) - { - Node node = db.createNode( Label.label( "Foo" ) ); - node.setProperty( "foobar", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar1", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar2", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar3", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar4", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar5", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar6", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar7", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar8", format( "baz_bat%s", UUID.randomUUID() ) ); - } - private void checkDataHasReplicatedToReadReplicas( Cluster cluster, long numberOfNodes ) throws Exception { for ( final ReadReplica server : cluster.readReplicas() ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/TransactionLogRecoveryIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/TransactionLogRecoveryIT.java index 2d6bb61c02248..2052b40ce20f6 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/TransactionLogRecoveryIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/TransactionLogRecoveryIT.java @@ -44,7 +44,7 @@ import static java.util.Collections.singletonList; import static org.neo4j.causalclustering.discovery.Cluster.dataMatchesEventually; -import static org.neo4j.causalclustering.helpers.DataCreator.createNodes; +import static org.neo4j.causalclustering.helpers.DataCreator.createEmptyNodes; /** * Recovery scenarios where the transaction log was only partially written. @@ -72,14 +72,14 @@ public void setup() throws Exception public void coreShouldStartAfterPartialTransactionWriteCrash() throws Exception { // given: a fully synced cluster with some data - dataMatchesEventually( createNodes( cluster, 10 ), cluster.coreMembers() ); + dataMatchesEventually( createEmptyNodes( cluster, 10 ), cluster.coreMembers() ); // when: shutting down a core CoreClusterMember core = cluster.getCoreMemberById( 0 ); core.shutdown(); // and making sure there will be something new to pull - CoreClusterMember lastWrites = createNodes( cluster, 10 ); + CoreClusterMember lastWrites = createEmptyNodes( cluster, 10 ); // and writing a partial tx writePartialTx( core.storeDir() ); @@ -95,14 +95,14 @@ public void coreShouldStartAfterPartialTransactionWriteCrash() throws Exception public void coreShouldStartWithSeedHavingPartialTransactionWriteCrash() throws Exception { // given: a fully synced cluster with some data - dataMatchesEventually( createNodes( cluster, 10 ), cluster.coreMembers() ); + dataMatchesEventually( createEmptyNodes( cluster, 10 ), cluster.coreMembers() ); // when: shutting down a core CoreClusterMember core = cluster.getCoreMemberById( 0 ); core.shutdown(); // and making sure there will be something new to pull - CoreClusterMember lastWrites = createNodes( cluster, 10 ); + CoreClusterMember lastWrites = createEmptyNodes( cluster, 10 ); // and writing a partial tx writePartialTx( core.storeDir() ); @@ -122,14 +122,14 @@ public void coreShouldStartWithSeedHavingPartialTransactionWriteCrash() throws E public void readReplicaShouldStartAfterPartialTransactionWriteCrash() throws Exception { // given: a fully synced cluster with some data - dataMatchesEventually( createNodes( cluster, 10 ), cluster.readReplicas() ); + dataMatchesEventually( createEmptyNodes( cluster, 10 ), cluster.readReplicas() ); // when: shutting down a read replica ReadReplica readReplica = cluster.getReadReplicaById( 0 ); readReplica.shutdown(); // and making sure there will be something new to pull - CoreClusterMember lastWrites = createNodes( cluster, 10 ); + CoreClusterMember lastWrites = createEmptyNodes( cluster, 10 ); dataMatchesEventually( lastWrites, cluster.coreMembers() ); // and writing a partial tx