From 20231d2d82397ff0eb2e7812c2403f515a47e450 Mon Sep 17 00:00:00 2001 From: jimwebber Date: Sat, 4 Mar 2017 14:58:25 +0000 Subject: [PATCH] Adding a Data Center Local upstream strategy. A sensible out of the box plugin for allowing read replicas to catch up from named others within their locality. Log the loaded strategies and their order of precedence. Does not catchup from self. --- .../catchup/tx/CatchupPollingProcess.java | 4 +- .../readreplica/ConnectWithinDataCenter.java | 69 +++++++++ .../EnterpriseReadReplicaEditionModule.java | 4 +- .../UpstreamDatabaseSelectionStrategy.java | 37 ++++- .../UpstreamDatabaseStrategiesLoader.java | 40 +++++- .../UpstreamDatabaseStrategySelector.java | 13 +- ...dreplica.UpstreamDatabaseSelectionStrategy | 1 + .../UpstreamDatabaseStrategiesLoaderTest.java | 16 ++- .../UpstreamDatabaseStrategySelectorTest.java | 5 +- .../ReadReplicaHierarchicalCatchupIT.java | 132 ++++++++++++++++++ .../ReadReplicaToReadReplicaCatchupIT.java | 7 +- .../metrics/source/Neo4jMetricsBuilder.java | 3 + .../causalclustering/CatchUpMetrics.java | 66 +++++++++ .../source/causalclustering/CoreMetrics.java | 4 - .../org/neo4j/metrics/CoreEdgeMetricsIT.java | 3 +- 15 files changed, 380 insertions(+), 24 deletions(-) create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectWithinDataCenter.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaHierarchicalCatchupIT.java create mode 100644 enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CatchUpMetrics.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java index 4e309a2beaaf..d46e1fcb7c04 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java @@ -239,7 +239,7 @@ private boolean pullAndApplyBatchOfTransactions( MemberId upstream, StoreId loca long lastQueuedTxId = applier.lastQueuedTxId(); pullRequestMonitor.txPullRequest( lastQueuedTxId ); TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId ); - log.debug( "Pull transactions where tx id > %d [batch #%d]", lastQueuedTxId, batchCount ); + log.debug( "Pull transactions from %s where tx id > %d [batch #%d]", upstream, lastQueuedTxId, batchCount ); TxStreamFinishedResponse response; try @@ -275,7 +275,7 @@ public void onTxStreamFinishedResponse( CompletableFuture. + */ +package org.neo4j.causalclustering.readreplica; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.stream.Collectors; + +import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.discovery.ReadReplicaInfo; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.helpers.Service; + +@Service.Implementation(UpstreamDatabaseSelectionStrategy.class) +public class ConnectWithinDataCenter extends UpstreamDatabaseSelectionStrategy +{ + private Random random = new Random();; + + public ConnectWithinDataCenter() + { + super( "connect-within-data-center" ); + } + + @Override + public Optional upstreamDatabase() throws UpstreamDatabaseSelectionException + { + Map replicas = topologyService.readReplicas().replicaMembers(); + + List tags = config.get( CausalClusteringSettings.server_tags ); + if ( tags.isEmpty() ) + { + return Optional.empty(); + } + + String myTag = tags.get( 0 ); + + List> choices = replicas.entrySet().stream() + .filter( entry -> entry.getValue().tags().contains( myTag ) && !entry.getKey().equals( myself ) ) + .collect( Collectors.toList() ); + + if ( choices.isEmpty() ) + { + return Optional.empty(); + } + else + { + return Optional.of( choices.get( random.nextInt( choices.size() ) ).getKey() ); + } + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 2de4e74fc1b3..57290a74065b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -248,10 +248,12 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data ConnectToRandomCoreServer defaultStrategy = new ConnectToRandomCoreServer(); defaultStrategy.setTopologyService( topologyService ); + defaultStrategy.setConfig( config ); + defaultStrategy.setMyself( myself ); UpstreamDatabaseStrategySelector upstreamDatabaseStrategySelector = new UpstreamDatabaseStrategySelector( defaultStrategy, - new UpstreamDatabaseStrategiesLoader( topologyService, config ), myself ); + new UpstreamDatabaseStrategiesLoader( topologyService, config, myself, logProvider ), myself, logProvider ); CatchupPollingProcess catchupProcess = new CatchupPollingProcess( logProvider, localDatabase, servicesToStopOnStoreCopy, catchUpClient, diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java index 21163b306f42..a37b26a290a0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java @@ -24,10 +24,13 @@ import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.Service; +import org.neo4j.kernel.configuration.Config; public abstract class UpstreamDatabaseSelectionStrategy extends Service { - TopologyService topologyService; + protected TopologyService topologyService; + protected Config config; + protected MemberId myself; public UpstreamDatabaseSelectionStrategy( String key, String... altKeys ) { @@ -40,5 +43,37 @@ void setTopologyService( TopologyService topologyService ) this.topologyService = topologyService; } + void setConfig( Config config ) + { + this.config = config; + } + + void setMyself(MemberId myself) { this.myself = myself;} + public abstract Optional upstreamDatabase() throws UpstreamDatabaseSelectionException; + + @Override + public String toString() + { + return nicelyCommaSeparatedList( getKeys() ); + } + + private static String nicelyCommaSeparatedList( Iterable keys ) + { + StringBuilder sb = new StringBuilder(); + for ( String key : keys ) + { + sb.append( key ); + sb.append( "," ); + sb.append( " " ); + } + + int trimThese = sb.lastIndexOf( ", " ); + if ( trimThese > 1 ) + { + sb.replace( trimThese, sb.length(), "" ); + } + + return sb.toString(); + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java index 4e8227ad3115..f75b17e38005 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java @@ -19,13 +19,17 @@ */ package org.neo4j.causalclustering.readreplica; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashSet; import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.Service; import org.neo4j.kernel.configuration.Config; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; /** * Loads and initialises any service implementations of UpstreamDatabaseSelectionStrategy. @@ -35,11 +39,15 @@ public class UpstreamDatabaseStrategiesLoader implements Iterable iterator() if ( candidate.getKeys().iterator().next().equals( key ) ) { candidate.setTopologyService( topologyService ); + candidate.setConfig( config ); + candidate.setMyself( myself ); candidates.add( candidate ); } } } + + log( candidates ); + return candidates.iterator(); } + + private void log( LinkedHashSet candidates ) + { + log.debug( "Upstream database strategies loaded in order of precedence: " + + nicelyCommaSeparatedList( candidates ) ); + } + + private static String nicelyCommaSeparatedList( Collection items ) + { + StringBuilder sb = new StringBuilder(); + for ( UpstreamDatabaseSelectionStrategy strategy : items ) + { + sb.append( strategy.toString() ); + sb.append( "," ); + sb.append( " " ); + } + + int trimThese = sb.lastIndexOf( ", " ); + if ( trimThese > 1 ) + { + sb.replace( trimThese, sb.length(), "" ); + } + + return sb.toString(); + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java index 5be7b0d04090..0c9e7eda54b8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java @@ -24,6 +24,9 @@ import java.util.NoSuchElementException; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; +import org.neo4j.logging.NullLogProvider; import static org.neo4j.helpers.collection.Iterables.empty; @@ -31,16 +34,20 @@ public class UpstreamDatabaseStrategySelector { private LinkedHashSet strategies = new LinkedHashSet<>(); private MemberId myself; + private Log log; UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy ) { - this( defaultStrategy, empty(), null ); + this( defaultStrategy, empty(), null, NullLogProvider.getInstance() ); } UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy, - Iterable otherStrategies, MemberId myself ) + Iterable otherStrategies, MemberId myself, + LogProvider logProvider ) { this.myself = myself; + this.log = logProvider.getLog( getClass() ); + if ( otherStrategies != null ) { for ( UpstreamDatabaseSelectionStrategy otherStrategy : otherStrategies ) @@ -56,6 +63,7 @@ public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException MemberId result = null; for ( UpstreamDatabaseSelectionStrategy strategy : strategies ) { + log.debug( "Trying selection strategy [%s]", strategy.toString() ); try { if ( strategy.upstreamDatabase().isPresent() ) @@ -76,6 +84,7 @@ public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException "Could not find an upstream database with which to connect." ); } + log.debug( "Selected upstream database [%s]", result ); return result; } } diff --git a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy index d1c1cddf7b9a..19f6edf13de3 100644 --- a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy +++ b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy @@ -1 +1,2 @@ org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServer +org.neo4j.causalclustering.readreplica.ConnectWithinDataCenter diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java index 76e64df678a4..80296b551ab3 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java @@ -22,9 +22,12 @@ import org.junit.Test; import java.util.Set; +import java.util.UUID; import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.Config; +import org.neo4j.logging.NullLogProvider; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; @@ -33,6 +36,9 @@ public class UpstreamDatabaseStrategiesLoaderTest { + + private MemberId myself = new MemberId( UUID.randomUUID() ); + @Test public void shouldReturnConfiguredClassesOnly() throws Exception { @@ -40,8 +46,9 @@ public void shouldReturnConfiguredClassesOnly() throws Exception Config config = Config.defaults(); config.augment( stringMap( "causal_clustering.upstream_selection_strategy", "dummy" ) ); - UpstreamDatabaseStrategiesLoader - strategies = new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config ); + UpstreamDatabaseStrategiesLoader strategies = + new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config, + myself, NullLogProvider.getInstance() ); // when Set upstreamDatabaseSelectionStrategies = asSet( strategies.iterator() ); @@ -61,8 +68,9 @@ public void shouldReturnTheFirstStrategyThatWorksFromThoseConfigured() throws Ex stringMap( "causal_clustering.upstream_selection_strategy", "yet-another-dummy,dummy,another-dummy" ) ); // when - UpstreamDatabaseStrategiesLoader - strategies = new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config ); + UpstreamDatabaseStrategiesLoader strategies = + new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config, + myself, NullLogProvider.getInstance() ); // then assertEquals( UpstreamDatabaseStrategySelectorTest.YetAnotherDummyUpstreamDatabaseSelectionStrategy.class, diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java index abc211e029d5..8bfa581fded9 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java @@ -32,6 +32,7 @@ import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.Service; +import org.neo4j.logging.NullLogProvider; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; @@ -59,7 +60,7 @@ public void shouldReturnTheMemberIdFromFirstSucessfulStrategy() throws Exception when( goodOne.upstreamDatabase() ).thenReturn( Optional.of( theMemberId ) ); UpstreamDatabaseStrategySelector selector = - new UpstreamDatabaseStrategySelector( badOne, iterable( goodOne, anotherBadOne ), null ); + new UpstreamDatabaseStrategySelector( badOne, iterable( goodOne, anotherBadOne ), null, NullLogProvider.getInstance() ); // when MemberId result = selector.bestUpstreamDatabase(); @@ -104,7 +105,7 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception when( mockStrategy.upstreamDatabase() ).thenReturn( Optional.of( new MemberId( UUID.randomUUID() ) ) ); UpstreamDatabaseStrategySelector selector = - new UpstreamDatabaseStrategySelector( shouldNotUse, iterable( mockStrategy ), null ); + new UpstreamDatabaseStrategySelector( shouldNotUse, iterable( mockStrategy ), null, NullLogProvider.getInstance() ); // when selector.bestUpstreamDatabase(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaHierarchicalCatchupIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaHierarchicalCatchupIT.java new file mode 100644 index 000000000000..b6fa1b6e0025 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaHierarchicalCatchupIT.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.scenarios; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import org.neo4j.backup.OnlineBackupSettings; +import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.discovery.Cluster; +import org.neo4j.causalclustering.discovery.CoreClusterMember; +import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory; +import org.neo4j.causalclustering.discovery.ReadReplica; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionException; +import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy; +import org.neo4j.function.ThrowingSupplier; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Transaction; +import org.neo4j.helpers.Service; +import org.neo4j.helpers.collection.Pair; +import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider; +import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.test.causalclustering.ClusterRule; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.neo4j.causalclustering.helpers.DataCreator.createLabelledNodesWithProperty; +import static org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT.SpecificReplicaStrategy.upstreamFactory; +import static org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT.checkDataHasReplicatedToReadReplicas; +import static org.neo4j.com.storecopy.StoreUtil.TEMP_COPY_DIRECTORY_NAME; +import static org.neo4j.graphdb.Label.label; +import static org.neo4j.helpers.collection.Iterables.count; +import static org.neo4j.test.assertion.Assert.assertEventually; + +public class ReadReplicaHierarchicalCatchupIT +{ + private Map serverTags = new HashMap<>(); + + @Before + public void setup() + { + serverTags.put( 0, "NORTH" ); + serverTags.put( 1, "NORTH" ); + serverTags.put( 2, "NORTH" ); + + serverTags.put( 3, "EAST" ); + serverTags.put( 5, "EAST" ); + + serverTags.put( 4, "WEST" ); + serverTags.put( 6, "WEST" ); + } + + @Rule + public ClusterRule clusterRule = + new ClusterRule( getClass() ).withNumberOfCoreMembers( 3 ).withNumberOfReadReplicas( 0 ) + .withSharedCoreParam( CausalClusteringSettings.cluster_topology_refresh, "5s" ) + .withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() ); + + @Test + public void shouldCatchupThroughHierarchy() throws Throwable + { + clusterRule = clusterRule + .withInstanceReadReplicaParam( CausalClusteringSettings.server_tags, id -> serverTags.get( id ) ) + .withInstanceCoreParam( CausalClusteringSettings.server_tags, id -> serverTags.get( id ) ); + + // given + Cluster cluster = clusterRule.startCluster(); + int numberOfNodesToCreate = 100; + + cluster.coreTx( ( db, tx ) -> + { + db.schema().constraintFor( label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create(); + tx.success(); + } ); + + // 0, 1, 2 are core instances + createLabelledNodesWithProperty( cluster, numberOfNodesToCreate, label( "Foo" ), + () -> Pair.of( "foobar", String.format( "baz_bat%s", UUID.randomUUID() ) ) ); + + // 3, 4 are other DCs + ReadReplica east3 = cluster.addReadReplicaWithId( 3 ); + east3.start(); + ReadReplica west4 = cluster.addReadReplicaWithId( 4 ); + west4.start(); + + checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate ); + + for ( CoreClusterMember coreClusterMember : cluster.coreMembers() ) + { + coreClusterMember.stopCatchupServer(); + } + + // 5, 6 are other DCs + ReadReplica east5 = cluster.addReadReplicaWithId( 5 ); + east5.setUpstreamDatabaseSelectionStrategy( "connect-within-data-center" ); + east5.start(); + ReadReplica west6 = cluster.addReadReplicaWithId( 6 ); + west6.setUpstreamDatabaseSelectionStrategy( "connect-within-data-center" ); + west6.start(); + + checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate ); + + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java index 2e685dee77b2..adfdee3335a7 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java @@ -81,9 +81,6 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable ReadReplica firstReadReplica = cluster.addReadReplicaWithIdAndMonitors( 101, new Monitors() ); - File labelScanStore = LabelScanStoreProvider - .getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) ); - firstReadReplica.start(); checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate ); @@ -124,8 +121,6 @@ public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() thro ReadReplica firstReadReplica = cluster.addReadReplicaWithIdAndMonitors( firstReadReplicaLocalMemberId, new Monitors() ); - File labelScanStore = LabelScanStoreProvider - .getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) ); firstReadReplica.start(); @@ -155,7 +150,7 @@ public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() thro checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes * 2 ); } - private void checkDataHasReplicatedToReadReplicas( Cluster cluster, long numberOfNodes ) throws Exception + static void checkDataHasReplicatedToReadReplicas( Cluster cluster, long numberOfNodes ) throws Exception { for ( final ReadReplica server : cluster.readReplicas() ) { diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java index a32344a9cc30..6b18d61759ba 100644 --- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java @@ -39,6 +39,7 @@ import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.metrics.MetricsSettings; import org.neo4j.metrics.output.EventReporter; +import org.neo4j.metrics.source.causalclustering.CatchUpMetrics; import org.neo4j.metrics.source.causalclustering.CoreMetrics; import org.neo4j.metrics.source.causalclustering.ReadReplicaMetrics; import org.neo4j.metrics.source.cluster.ClusterMetrics; @@ -197,11 +198,13 @@ public boolean build() if ( mode == OperationalMode.core ) { life.add( new CoreMetrics( dependencies.monitors(), registry, dependencies.raft() ) ); + life.add( new CatchUpMetrics( dependencies.monitors(), registry ) ); result = true; } else if ( mode == OperationalMode.read_replica ) { life.add( new ReadReplicaMetrics( dependencies.monitors(), registry ) ); + life.add( new CatchUpMetrics( dependencies.monitors(), registry ) ); result = true; } else diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CatchUpMetrics.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CatchUpMetrics.java new file mode 100644 index 000000000000..e7af5b414e09 --- /dev/null +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CatchUpMetrics.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.metrics.source.causalclustering; + +import java.io.IOException; +import java.util.function.Supplier; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; + +import org.neo4j.causalclustering.core.consensus.CoreMetaData; +import org.neo4j.kernel.impl.annotations.Documented; +import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.kernel.monitoring.Monitors; + +import static com.codahale.metrics.MetricRegistry.name; + +@Documented(".CatchUp Metrics") +public class CatchUpMetrics extends LifecycleAdapter +{ + private static final String CAUSAL_CLUSTERING_PREFIX = "neo4j.causal_clustering.catchup"; + + @Documented("TX pull requests received from read replicas") + public static final String TX_PULL_REQUESTS_RECEIVED = name( CAUSAL_CLUSTERING_PREFIX, "tx_pull_requests_received" ); + + private Monitors monitors; + private MetricRegistry registry; + private final TxPullRequestsMetric txPullRequestsMetric = new TxPullRequestsMetric(); + + public CatchUpMetrics( Monitors monitors, MetricRegistry registry ) + { + this.monitors = monitors; + this.registry = registry; + } + + @Override + public void start() throws Throwable + { + monitors.addMonitorListener( txPullRequestsMetric ); + registry.register( TX_PULL_REQUESTS_RECEIVED, (Gauge) txPullRequestsMetric::txPullRequestsReceived ); + } + + @Override + public void stop() throws IOException + { + registry.remove( TX_PULL_REQUESTS_RECEIVED ); + monitors.removeMonitorListener( txPullRequestsMetric ); + } +} diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CoreMetrics.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CoreMetrics.java index fc46add3c91f..6444e037c88f 100644 --- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CoreMetrics.java +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/causalclustering/CoreMetrics.java @@ -45,8 +45,6 @@ public class CoreMetrics extends LifecycleAdapter public static final String TERM = name( CAUSAL_CLUSTERING_PREFIX, "term" ); @Documented("Leader was not found while attempting to commit a transaction") public static final String LEADER_NOT_FOUND = name( CAUSAL_CLUSTERING_PREFIX, "leader_not_found" ); - @Documented("TX pull requests received from read replicas") - public static final String TX_PULL_REQUESTS_RECEIVED = name( CAUSAL_CLUSTERING_PREFIX, "tx_pull_requests_received" ); @Documented("Transaction retries") public static final String TX_RETRIES = name( CAUSAL_CLUSTERING_PREFIX, "tx_retries" ); @Documented("Is this server the leader?") @@ -90,7 +88,6 @@ public void start() throws Throwable registry.register( APPEND_INDEX, (Gauge) raftLogAppendIndexMetric::appendIndex ); registry.register( TERM, (Gauge) raftTermMetric::term ); registry.register( LEADER_NOT_FOUND, (Gauge) leaderNotFoundMetric::leaderNotFoundExceptions ); - registry.register( TX_PULL_REQUESTS_RECEIVED, (Gauge) txPullRequestsMetric::txPullRequestsReceived ); registry.register( TX_RETRIES, (Gauge) txRetryMetric::transactionsRetries ); registry.register( IS_LEADER, new LeaderGauge() ); registry.register( DROPPED_MESSAGES, (Gauge) messageQueueMetric::droppedMessages ); @@ -104,7 +101,6 @@ public void stop() throws IOException registry.remove( APPEND_INDEX ); registry.remove( TERM ); registry.remove( LEADER_NOT_FOUND ); - registry.remove( TX_PULL_REQUESTS_RECEIVED ); registry.remove( TX_RETRIES ); registry.remove( IS_LEADER ); registry.remove( DROPPED_MESSAGES ); diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java index 59e6a50acf46..7351433725c6 100644 --- a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java +++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java @@ -38,6 +38,7 @@ import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.internal.GraphDatabaseAPI; +import org.neo4j.metrics.source.causalclustering.CatchUpMetrics; import org.neo4j.metrics.source.causalclustering.CoreMetrics; import org.neo4j.test.causalclustering.ClusterRule; @@ -133,7 +134,7 @@ public void shouldMonitorCoreEdge() throws Exception for ( final File homeDir : cluster.coreMembers().stream().map( CoreClusterMember::homeDir ).collect( Collectors.toList()) ) { File metricsDir = new File( homeDir, "metrics" ); - total += readLongValue( metricsCsv( metricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) ); + total += readLongValue( metricsCsv( metricsDir, CatchUpMetrics.TX_PULL_REQUESTS_RECEIVED ) ); } return total; }, greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS );