diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java
index 4e309a2beaaf7..d46e1fcb7c047 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java
@@ -239,7 +239,7 @@ private boolean pullAndApplyBatchOfTransactions( MemberId upstream, StoreId loca
long lastQueuedTxId = applier.lastQueuedTxId();
pullRequestMonitor.txPullRequest( lastQueuedTxId );
TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId );
- log.debug( "Pull transactions where tx id > %d [batch #%d]", lastQueuedTxId, batchCount );
+ log.debug( "Pull transactions from %s where tx id > %d [batch #%d]", upstream, lastQueuedTxId, batchCount );
TxStreamFinishedResponse response;
try
@@ -275,7 +275,7 @@ public void onTxStreamFinishedResponse( CompletableFuture.
+ */
+package org.neo4j.causalclustering.readreplica;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.neo4j.causalclustering.core.CausalClusteringSettings;
+import org.neo4j.causalclustering.discovery.ReadReplicaInfo;
+import org.neo4j.causalclustering.identity.MemberId;
+import org.neo4j.helpers.Service;
+
+@Service.Implementation(UpstreamDatabaseSelectionStrategy.class)
+public class ConnectWithinDataCenter extends UpstreamDatabaseSelectionStrategy
+{
+ private Random random = new Random();;
+
+ public ConnectWithinDataCenter()
+ {
+ super( "connect-within-data-center" );
+ }
+
+ @Override
+ public Optional upstreamDatabase() throws UpstreamDatabaseSelectionException
+ {
+ Map replicas = topologyService.readReplicas().replicaMembers();
+
+ List tags = config.get( CausalClusteringSettings.server_tags );
+ if ( tags.isEmpty() )
+ {
+ return Optional.empty();
+ }
+
+ String myTag = tags.get( 0 );
+
+ List> choices = replicas.entrySet().stream()
+ .filter( entry -> entry.getValue().tags().contains( myTag ) && !entry.getKey().equals( myself ) )
+ .collect( Collectors.toList() );
+
+ if ( choices.isEmpty() )
+ {
+ return Optional.empty();
+ }
+ else
+ {
+ return Optional.of( choices.get( random.nextInt( choices.size() ) ).getKey() );
+ }
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java
index 2de4e74fc1b3f..57290a74065b1 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java
@@ -248,10 +248,12 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data
ConnectToRandomCoreServer defaultStrategy = new ConnectToRandomCoreServer();
defaultStrategy.setTopologyService( topologyService );
+ defaultStrategy.setConfig( config );
+ defaultStrategy.setMyself( myself );
UpstreamDatabaseStrategySelector upstreamDatabaseStrategySelector =
new UpstreamDatabaseStrategySelector( defaultStrategy,
- new UpstreamDatabaseStrategiesLoader( topologyService, config ), myself );
+ new UpstreamDatabaseStrategiesLoader( topologyService, config, myself, logProvider ), myself, logProvider );
CatchupPollingProcess catchupProcess =
new CatchupPollingProcess( logProvider, localDatabase, servicesToStopOnStoreCopy, catchUpClient,
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java
index 21163b306f425..a37b26a290a0a 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java
@@ -24,10 +24,13 @@
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.Service;
+import org.neo4j.kernel.configuration.Config;
public abstract class UpstreamDatabaseSelectionStrategy extends Service
{
- TopologyService topologyService;
+ protected TopologyService topologyService;
+ protected Config config;
+ protected MemberId myself;
public UpstreamDatabaseSelectionStrategy( String key, String... altKeys )
{
@@ -40,5 +43,37 @@ void setTopologyService( TopologyService topologyService )
this.topologyService = topologyService;
}
+ void setConfig( Config config )
+ {
+ this.config = config;
+ }
+
+ void setMyself(MemberId myself) { this.myself = myself;}
+
public abstract Optional upstreamDatabase() throws UpstreamDatabaseSelectionException;
+
+ @Override
+ public String toString()
+ {
+ return nicelyCommaSeparatedList( getKeys() );
+ }
+
+ private static String nicelyCommaSeparatedList( Iterable keys )
+ {
+ StringBuilder sb = new StringBuilder();
+ for ( String key : keys )
+ {
+ sb.append( key );
+ sb.append( "," );
+ sb.append( " " );
+ }
+
+ int trimThese = sb.lastIndexOf( ", " );
+ if ( trimThese > 1 )
+ {
+ sb.replace( trimThese, sb.length(), "" );
+ }
+
+ return sb.toString();
+ }
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java
index 4e8227ad3115a..f75b17e380057 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java
@@ -19,13 +19,17 @@
*/
package org.neo4j.causalclustering.readreplica;
+import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.discovery.TopologyService;
+import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.Service;
import org.neo4j.kernel.configuration.Config;
+import org.neo4j.logging.Log;
+import org.neo4j.logging.LogProvider;
/**
* Loads and initialises any service implementations of UpstreamDatabaseSelectionStrategy.
@@ -35,11 +39,15 @@ public class UpstreamDatabaseStrategiesLoader implements Iterable iterator()
if ( candidate.getKeys().iterator().next().equals( key ) )
{
candidate.setTopologyService( topologyService );
+ candidate.setConfig( config );
+ candidate.setMyself( myself );
candidates.add( candidate );
}
}
}
+
+ log( candidates );
+
return candidates.iterator();
}
+
+ private void log( LinkedHashSet candidates )
+ {
+ log.debug( "Upstream database strategies loaded in order of precedence: " +
+ nicelyCommaSeparatedList( candidates ) );
+ }
+
+ private static String nicelyCommaSeparatedList( Collection items )
+ {
+ StringBuilder sb = new StringBuilder();
+ for ( UpstreamDatabaseSelectionStrategy strategy : items )
+ {
+ sb.append( strategy.toString() );
+ sb.append( "," );
+ sb.append( " " );
+ }
+
+ int trimThese = sb.lastIndexOf( ", " );
+ if ( trimThese > 1 )
+ {
+ sb.replace( trimThese, sb.length(), "" );
+ }
+
+ return sb.toString();
+ }
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java
index 5be7b0d040906..0c9e7eda54b80 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java
@@ -24,6 +24,9 @@
import java.util.NoSuchElementException;
import org.neo4j.causalclustering.identity.MemberId;
+import org.neo4j.logging.Log;
+import org.neo4j.logging.LogProvider;
+import org.neo4j.logging.NullLogProvider;
import static org.neo4j.helpers.collection.Iterables.empty;
@@ -31,16 +34,20 @@ public class UpstreamDatabaseStrategySelector
{
private LinkedHashSet strategies = new LinkedHashSet<>();
private MemberId myself;
+ private Log log;
UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy )
{
- this( defaultStrategy, empty(), null );
+ this( defaultStrategy, empty(), null, NullLogProvider.getInstance() );
}
UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy,
- Iterable otherStrategies, MemberId myself )
+ Iterable otherStrategies, MemberId myself,
+ LogProvider logProvider )
{
this.myself = myself;
+ this.log = logProvider.getLog( getClass() );
+
if ( otherStrategies != null )
{
for ( UpstreamDatabaseSelectionStrategy otherStrategy : otherStrategies )
@@ -56,6 +63,7 @@ public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException
MemberId result = null;
for ( UpstreamDatabaseSelectionStrategy strategy : strategies )
{
+ log.debug( "Trying selection strategy [%s]", strategy.toString() );
try
{
if ( strategy.upstreamDatabase().isPresent() )
@@ -76,6 +84,7 @@ public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException
"Could not find an upstream database with which to connect." );
}
+ log.debug( "Selected upstream database [%s]", result );
return result;
}
}
diff --git a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy
index d1c1cddf7b9a4..19f6edf13de3d 100644
--- a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy
+++ b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy
@@ -1 +1,2 @@
org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServer
+org.neo4j.causalclustering.readreplica.ConnectWithinDataCenter
diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java
index 76e64df678a40..80296b551ab37 100644
--- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java
+++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java
@@ -22,9 +22,12 @@
import org.junit.Test;
import java.util.Set;
+import java.util.UUID;
import org.neo4j.causalclustering.discovery.TopologyService;
+import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.configuration.Config;
+import org.neo4j.logging.NullLogProvider;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@@ -33,6 +36,9 @@
public class UpstreamDatabaseStrategiesLoaderTest
{
+
+ private MemberId myself = new MemberId( UUID.randomUUID() );
+
@Test
public void shouldReturnConfiguredClassesOnly() throws Exception
{
@@ -40,8 +46,9 @@ public void shouldReturnConfiguredClassesOnly() throws Exception
Config config = Config.defaults();
config.augment( stringMap( "causal_clustering.upstream_selection_strategy", "dummy" ) );
- UpstreamDatabaseStrategiesLoader
- strategies = new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config );
+ UpstreamDatabaseStrategiesLoader strategies =
+ new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config,
+ myself, NullLogProvider.getInstance() );
// when
Set upstreamDatabaseSelectionStrategies = asSet( strategies.iterator() );
@@ -61,8 +68,9 @@ public void shouldReturnTheFirstStrategyThatWorksFromThoseConfigured() throws Ex
stringMap( "causal_clustering.upstream_selection_strategy", "yet-another-dummy,dummy,another-dummy" ) );
// when
- UpstreamDatabaseStrategiesLoader
- strategies = new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config );
+ UpstreamDatabaseStrategiesLoader strategies =
+ new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config,
+ myself, NullLogProvider.getInstance() );
// then
assertEquals( UpstreamDatabaseStrategySelectorTest.YetAnotherDummyUpstreamDatabaseSelectionStrategy.class,
diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java
index abc211e029d50..8bfa581fded9a 100644
--- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java
+++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java
@@ -32,6 +32,7 @@
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.Service;
+import org.neo4j.logging.NullLogProvider;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@@ -59,7 +60,7 @@ public void shouldReturnTheMemberIdFromFirstSucessfulStrategy() throws Exception
when( goodOne.upstreamDatabase() ).thenReturn( Optional.of( theMemberId ) );
UpstreamDatabaseStrategySelector selector =
- new UpstreamDatabaseStrategySelector( badOne, iterable( goodOne, anotherBadOne ), null );
+ new UpstreamDatabaseStrategySelector( badOne, iterable( goodOne, anotherBadOne ), null, NullLogProvider.getInstance() );
// when
MemberId result = selector.bestUpstreamDatabase();
@@ -104,7 +105,7 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception
when( mockStrategy.upstreamDatabase() ).thenReturn( Optional.of( new MemberId( UUID.randomUUID() ) ) );
UpstreamDatabaseStrategySelector selector =
- new UpstreamDatabaseStrategySelector( shouldNotUse, iterable( mockStrategy ), null );
+ new UpstreamDatabaseStrategySelector( shouldNotUse, iterable( mockStrategy ), null, NullLogProvider.getInstance() );
// when
selector.bestUpstreamDatabase();
diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaHierarchicalCatchupIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaHierarchicalCatchupIT.java
new file mode 100644
index 0000000000000..b6fa1b6e00256
--- /dev/null
+++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaHierarchicalCatchupIT.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.scenarios;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.neo4j.backup.OnlineBackupSettings;
+import org.neo4j.causalclustering.core.CausalClusteringSettings;
+import org.neo4j.causalclustering.discovery.Cluster;
+import org.neo4j.causalclustering.discovery.CoreClusterMember;
+import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory;
+import org.neo4j.causalclustering.discovery.ReadReplica;
+import org.neo4j.causalclustering.identity.MemberId;
+import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionException;
+import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy;
+import org.neo4j.function.ThrowingSupplier;
+import org.neo4j.graphdb.GraphDatabaseService;
+import org.neo4j.graphdb.Node;
+import org.neo4j.graphdb.Transaction;
+import org.neo4j.helpers.Service;
+import org.neo4j.helpers.collection.Pair;
+import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
+import org.neo4j.kernel.monitoring.Monitors;
+import org.neo4j.test.causalclustering.ClusterRule;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.neo4j.causalclustering.helpers.DataCreator.createLabelledNodesWithProperty;
+import static org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT.SpecificReplicaStrategy.upstreamFactory;
+import static org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT.checkDataHasReplicatedToReadReplicas;
+import static org.neo4j.com.storecopy.StoreUtil.TEMP_COPY_DIRECTORY_NAME;
+import static org.neo4j.graphdb.Label.label;
+import static org.neo4j.helpers.collection.Iterables.count;
+import static org.neo4j.test.assertion.Assert.assertEventually;
+
+public class ReadReplicaHierarchicalCatchupIT
+{
+ private Map serverTags = new HashMap<>();
+
+ @Before
+ public void setup()
+ {
+ serverTags.put( 0, "NORTH" );
+ serverTags.put( 1, "NORTH" );
+ serverTags.put( 2, "NORTH" );
+
+ serverTags.put( 3, "EAST" );
+ serverTags.put( 5, "EAST" );
+
+ serverTags.put( 4, "WEST" );
+ serverTags.put( 6, "WEST" );
+ }
+
+ @Rule
+ public ClusterRule clusterRule =
+ new ClusterRule( getClass() ).withNumberOfCoreMembers( 3 ).withNumberOfReadReplicas( 0 )
+ .withSharedCoreParam( CausalClusteringSettings.cluster_topology_refresh, "5s" )
+ .withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() );
+
+ @Test
+ public void shouldCatchupThroughHierarchy() throws Throwable
+ {
+ clusterRule = clusterRule
+ .withInstanceReadReplicaParam( CausalClusteringSettings.server_tags, id -> serverTags.get( id ) )
+ .withInstanceCoreParam( CausalClusteringSettings.server_tags, id -> serverTags.get( id ) );
+
+ // given
+ Cluster cluster = clusterRule.startCluster();
+ int numberOfNodesToCreate = 100;
+
+ cluster.coreTx( ( db, tx ) ->
+ {
+ db.schema().constraintFor( label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create();
+ tx.success();
+ } );
+
+ // 0, 1, 2 are core instances
+ createLabelledNodesWithProperty( cluster, numberOfNodesToCreate, label( "Foo" ),
+ () -> Pair.of( "foobar", String.format( "baz_bat%s", UUID.randomUUID() ) ) );
+
+ // 3, 4 are other DCs
+ ReadReplica east3 = cluster.addReadReplicaWithId( 3 );
+ east3.start();
+ ReadReplica west4 = cluster.addReadReplicaWithId( 4 );
+ west4.start();
+
+ checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate );
+
+ for ( CoreClusterMember coreClusterMember : cluster.coreMembers() )
+ {
+ coreClusterMember.stopCatchupServer();
+ }
+
+ // 5, 6 are other DCs
+ ReadReplica east5 = cluster.addReadReplicaWithId( 5 );
+ east5.setUpstreamDatabaseSelectionStrategy( "connect-within-data-center" );
+ east5.start();
+ ReadReplica west6 = cluster.addReadReplicaWithId( 6 );
+ west6.setUpstreamDatabaseSelectionStrategy( "connect-within-data-center" );
+ west6.start();
+
+ checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate );
+
+ }
+}
diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java
index 2e685dee77b24..adfdee3335a7f 100644
--- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java
+++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java
@@ -81,9 +81,6 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable
ReadReplica firstReadReplica = cluster.addReadReplicaWithIdAndMonitors( 101, new Monitors() );
- File labelScanStore = LabelScanStoreProvider
- .getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) );
-
firstReadReplica.start();
checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate );
@@ -124,8 +121,6 @@ public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() thro
ReadReplica firstReadReplica =
cluster.addReadReplicaWithIdAndMonitors( firstReadReplicaLocalMemberId, new Monitors() );
- File labelScanStore = LabelScanStoreProvider
- .getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) );
firstReadReplica.start();
@@ -155,7 +150,7 @@ public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() thro
checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes * 2 );
}
- private void checkDataHasReplicatedToReadReplicas( Cluster cluster, long numberOfNodes ) throws Exception
+ static void checkDataHasReplicatedToReadReplicas( Cluster cluster, long numberOfNodes ) throws Exception
{
for ( final ReadReplica server : cluster.readReplicas() )
{
diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java
index a32344a9cc308..6b18d61759ba9 100644
--- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java
+++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java
@@ -39,6 +39,7 @@
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.metrics.MetricsSettings;
import org.neo4j.metrics.output.EventReporter;
+import org.neo4j.metrics.source.causalclustering.CatchUpMetrics;
import org.neo4j.metrics.source.causalclustering.CoreMetrics;
import org.neo4j.metrics.source.causalclustering.ReadReplicaMetrics;
import org.neo4j.metrics.source.cluster.ClusterMetrics;
@@ -197,11 +198,13 @@ public boolean build()
if ( mode == OperationalMode.core )
{
life.add( new CoreMetrics( dependencies.monitors(), registry, dependencies.raft() ) );
+ life.add( new CatchUpMetrics( dependencies.monitors(), registry ) );
result = true;
}
else if ( mode == OperationalMode.read_replica )
{
life.add( new ReadReplicaMetrics( dependencies.monitors(), registry ) );
+ life.add( new CatchUpMetrics( dependencies.monitors(), registry ) );
result = true;
}
else
diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CatchUpMetrics.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CatchUpMetrics.java
new file mode 100644
index 0000000000000..e7af5b414e09c
--- /dev/null
+++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CatchUpMetrics.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.metrics.source.causalclustering;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+
+import org.neo4j.causalclustering.core.consensus.CoreMetaData;
+import org.neo4j.kernel.impl.annotations.Documented;
+import org.neo4j.kernel.lifecycle.LifecycleAdapter;
+import org.neo4j.kernel.monitoring.Monitors;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+@Documented(".CatchUp Metrics")
+public class CatchUpMetrics extends LifecycleAdapter
+{
+ private static final String CAUSAL_CLUSTERING_PREFIX = "neo4j.causal_clustering.catchup";
+
+ @Documented("TX pull requests received from read replicas")
+ public static final String TX_PULL_REQUESTS_RECEIVED = name( CAUSAL_CLUSTERING_PREFIX, "tx_pull_requests_received" );
+
+ private Monitors monitors;
+ private MetricRegistry registry;
+ private final TxPullRequestsMetric txPullRequestsMetric = new TxPullRequestsMetric();
+
+ public CatchUpMetrics( Monitors monitors, MetricRegistry registry )
+ {
+ this.monitors = monitors;
+ this.registry = registry;
+ }
+
+ @Override
+ public void start() throws Throwable
+ {
+ monitors.addMonitorListener( txPullRequestsMetric );
+ registry.register( TX_PULL_REQUESTS_RECEIVED, (Gauge) txPullRequestsMetric::txPullRequestsReceived );
+ }
+
+ @Override
+ public void stop() throws IOException
+ {
+ registry.remove( TX_PULL_REQUESTS_RECEIVED );
+ monitors.removeMonitorListener( txPullRequestsMetric );
+ }
+}
diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CoreMetrics.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CoreMetrics.java
index fc46add3c91f1..6444e037c88f1 100644
--- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CoreMetrics.java
+++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CoreMetrics.java
@@ -45,8 +45,6 @@ public class CoreMetrics extends LifecycleAdapter
public static final String TERM = name( CAUSAL_CLUSTERING_PREFIX, "term" );
@Documented("Leader was not found while attempting to commit a transaction")
public static final String LEADER_NOT_FOUND = name( CAUSAL_CLUSTERING_PREFIX, "leader_not_found" );
- @Documented("TX pull requests received from read replicas")
- public static final String TX_PULL_REQUESTS_RECEIVED = name( CAUSAL_CLUSTERING_PREFIX, "tx_pull_requests_received" );
@Documented("Transaction retries")
public static final String TX_RETRIES = name( CAUSAL_CLUSTERING_PREFIX, "tx_retries" );
@Documented("Is this server the leader?")
@@ -90,7 +88,6 @@ public void start() throws Throwable
registry.register( APPEND_INDEX, (Gauge) raftLogAppendIndexMetric::appendIndex );
registry.register( TERM, (Gauge) raftTermMetric::term );
registry.register( LEADER_NOT_FOUND, (Gauge) leaderNotFoundMetric::leaderNotFoundExceptions );
- registry.register( TX_PULL_REQUESTS_RECEIVED, (Gauge) txPullRequestsMetric::txPullRequestsReceived );
registry.register( TX_RETRIES, (Gauge) txRetryMetric::transactionsRetries );
registry.register( IS_LEADER, new LeaderGauge() );
registry.register( DROPPED_MESSAGES, (Gauge) messageQueueMetric::droppedMessages );
@@ -104,7 +101,6 @@ public void stop() throws IOException
registry.remove( APPEND_INDEX );
registry.remove( TERM );
registry.remove( LEADER_NOT_FOUND );
- registry.remove( TX_PULL_REQUESTS_RECEIVED );
registry.remove( TX_RETRIES );
registry.remove( IS_LEADER );
registry.remove( DROPPED_MESSAGES );
diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java
index 59e6a50acf464..7351433725c6b 100644
--- a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java
+++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java
@@ -38,6 +38,7 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
+import org.neo4j.metrics.source.causalclustering.CatchUpMetrics;
import org.neo4j.metrics.source.causalclustering.CoreMetrics;
import org.neo4j.test.causalclustering.ClusterRule;
@@ -133,7 +134,7 @@ public void shouldMonitorCoreEdge() throws Exception
for ( final File homeDir : cluster.coreMembers().stream().map( CoreClusterMember::homeDir ).collect( Collectors.toList()) )
{
File metricsDir = new File( homeDir, "metrics" );
- total += readLongValue( metricsCsv( metricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) );
+ total += readLongValue( metricsCsv( metricsDir, CatchUpMetrics.TX_PULL_REQUESTS_RECEIVED ) );
}
return total;
}, greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS );