From d204f729dac878d072f9d0bb8be7f147cf14beff Mon Sep 17 00:00:00 2001 From: Davide Grohmann Date: Wed, 18 May 2016 12:08:22 +0200 Subject: [PATCH] Fix core edge metrics The commit index listener was never notified about the new commit indexex. This change will fix the issue by moving the listener notification in the CoreState. --- .../coreedge/raft/log/MonitoredRaftLog.java | 12 +-- .../neo4j/coreedge/raft/state/CoreState.java | 12 ++- .../core/EnterpriseCoreEditionModule.java | 7 +- .../org/neo4j/coreedge/discovery/Cluster.java | 5 +- .../coreedge/raft/state/CoreStateTest.java | 9 +- .../org/neo4j/metrics/CoreEdgeMetricsIT.java | 90 ++++++++++--------- 6 files changed, 72 insertions(+), 63 deletions(-) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java index a54acbc6859ad..d64e5254ab434 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java @@ -28,23 +28,13 @@ public class MonitoredRaftLog implements RaftLog { - public static final String APPEND_INDEX_TAG = "appendIndex"; - public static final String COMMIT_INDEX_TAG = "commitIndex"; - private final RaftLog delegate; private final RaftLogAppendIndexMonitor appendIndexMonitor; - private final RaftLogCommitIndexMonitor commitIndexMonitor; public MonitoredRaftLog( RaftLog delegate, Monitors monitors ) { this.delegate = delegate; - this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass(), APPEND_INDEX_TAG ); - this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass(), COMMIT_INDEX_TAG ); - } - - public RaftLog delegate() - { - return delegate; + this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass() ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java index 137653ca21f05..eff71ea77054f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java @@ -29,6 +29,7 @@ import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogCompactedException; import org.neo4j.coreedge.raft.log.RaftLogCursor; +import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; import org.neo4j.coreedge.raft.log.pruning.LogPruner; import org.neo4j.coreedge.raft.replication.DistributedOperation; import org.neo4j.coreedge.raft.replication.ProgressTracker; @@ -39,12 +40,12 @@ import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; public class CoreState extends LifecycleAdapter implements RaftStateMachine, LogPruner { - private static final long NOTHING = -1; private CoreStateMachines coreStateMachines; private final RaftLog raftLog; @@ -65,6 +66,7 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log private final CoreServerSelectionStrategy selectionStrategy; private final CoreStateDownloader downloader; + private final RaftLogCommitIndexMonitor commitIndexMonitor; public CoreState( RaftLog raftLog, @@ -77,7 +79,8 @@ public CoreState( StateStorage> sessionStorage, CoreServerSelectionStrategy selectionStrategy, CoreStateApplier applier, - CoreStateDownloader downloader ) + CoreStateDownloader downloader, + Monitors monitors ) { this.raftLog = raftLog; this.lastFlushedStorage = lastFlushedStorage; @@ -90,6 +93,7 @@ public CoreState( this.selectionStrategy = selectionStrategy; this.log = logProvider.getLog( getClass() ); this.dbHealth = dbHealth; + this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass() ); } public synchronized void setStateMachine( CoreStateMachines coreStateMachines, long lastApplied ) @@ -101,10 +105,12 @@ public synchronized void setStateMachine( CoreStateMachines coreStateMachines, l @Override public synchronized void notifyCommitted( long commitIndex ) { - if ( this.lastSeenCommitIndex != commitIndex ) + assert this.lastSeenCommitIndex <= commitIndex; + if ( this.lastSeenCommitIndex < commitIndex ) { this.lastSeenCommitIndex = commitIndex; submitApplyJob( commitIndex ); + commitIndexMonitor.commitIndex( commitIndex ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index 14c556e07b725..5059f6b16decd 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -274,8 +274,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, final DelayedRenewableTimeoutService raftTimeoutService = new DelayedRenewableTimeoutService( Clock.systemUTC(), logProvider ); - RaftLog underlyingLog = createRaftLog( config, life, fileSystem, clusterStateDirectory, marshal, logProvider, - databaseHealthSupplier ); + RaftLog underlyingLog = createRaftLog( + config, life, fileSystem, clusterStateDirectory, marshal, logProvider, databaseHealthSupplier ); MonitoredRaftLog raftLog = new MonitoredRaftLog( underlyingLog, platformModule.monitors ); @@ -335,7 +335,8 @@ fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session coreState = new CoreState( raftLog, config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, lastApplyingStorage, - sessionTrackerStorage, new NotMyselfSelectionStrategy( discoveryService, myself ), applier, downloader ); + sessionTrackerStorage, new NotMyselfSelectionStrategy( discoveryService, myself ), applier, + downloader, platformModule.monitors ); raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, raftLog, coreState, fileSystem, clusterStateDirectory, myself, logProvider, raftServer, diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java index 38ba41d097429..6dd07e37b4458 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java @@ -44,11 +44,13 @@ import org.neo4j.coreedge.server.CoreEdgeClusterSettings; import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.coreedge.server.edge.EdgeGraphDatabase; +import org.neo4j.dbms.DatabaseManagementSystemSettings; import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.TransactionFailureException; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.kernel.GraphDatabaseDependencies; import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; @@ -180,9 +182,6 @@ private Map serverParams( String serverType, int serverId, String params.put( ClusterSettings.cluster_name.name(), CLUSTER_NAME ); params.put( ClusterSettings.server_id.name(), String.valueOf( serverId ) ); params.put( CoreEdgeClusterSettings.initial_core_cluster_members.name(), initialHosts ); - params.put( "metrics.csv.enabled", "true" ); - params.put( "metrics.neo4j.core_edge.enabled", "true" ); - params.put( "metrics.csv.path", "metrics" ); return params; } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java index 9e25c2822ff71..a2c6ffc8a114a 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java @@ -27,6 +27,7 @@ import org.neo4j.coreedge.raft.NewLeaderBarrier; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; +import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; import org.neo4j.coreedge.raft.replication.DistributedOperation; import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl; import org.neo4j.coreedge.raft.replication.ReplicatedContent; @@ -39,6 +40,7 @@ import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLogProvider; import static junit.framework.TestCase.assertEquals; @@ -65,13 +67,15 @@ public class CoreStateTest private final int flushEvery = 10; private final CoreStateApplier applier = new CoreStateApplier( NullLogProvider.getInstance() ); + private final Monitors monitors = new Monitors(); private final CoreState coreState = new CoreState( raftLog, flushEvery, () -> dbHealth, NullLogProvider.getInstance(), new ProgressTrackerImpl( globalSession ), lastFlushedStorage, lastApplyingStorage, sessionStorage, - mock( CoreServerSelectionStrategy.class ), applier, mock( CoreStateDownloader.class ) ); + mock( CoreServerSelectionStrategy.class ), applier, mock( CoreStateDownloader.class ), monitors ); private ReplicatedTransaction nullTx = new ReplicatedTransaction( null ); private final CoreStateMachines txStateMachine = txStateMachinesMock(); + private CoreStateMachines txStateMachinesMock() { CoreStateMachines stateMachines = mock( CoreStateMachines.class ); @@ -97,6 +101,8 @@ private synchronized ReplicatedContent operation( CoreReplicatedContent tx ) public void shouldApplyCommittedCommand() throws Exception { // given + RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class ); + monitors.addMonitorListener( listener ); coreState.setStateMachine( txStateMachine, -1 ); coreState.start(); @@ -111,6 +117,7 @@ public void shouldApplyCommittedCommand() throws Exception verify( txStateMachine ).dispatch( nullTx, 0 ); verify( txStateMachine ).dispatch( nullTx, 1 ); verify( txStateMachine ).dispatch( nullTx, 2 ); + verify( listener).commitIndex( 2 ); } @Test diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java index 47ccb90c73cfb..b31cfef5d7b8a 100644 --- a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java +++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java @@ -30,13 +30,18 @@ import org.neo4j.coreedge.discovery.Cluster; import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.coreedge.server.edge.EdgeGraphDatabase; +import org.neo4j.cypher.internal.compiler.v3_1.planner.logical.Metrics; import org.neo4j.function.ThrowingSupplier; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Transaction; +import org.neo4j.helpers.collection.Iterables; import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; +import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.metrics.source.coreedge.CoreMetrics; +import org.neo4j.test.coreedge.ClusterRule; import org.neo4j.test.rule.TargetDirectory; import static java.util.concurrent.TimeUnit.SECONDS; @@ -46,7 +51,9 @@ import static org.junit.Assert.assertEquals; import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_advertised_address; import static org.neo4j.graphdb.Label.label; +import static org.neo4j.helpers.collection.Iterables.concat; import static org.neo4j.helpers.collection.Iterables.count; +import static org.neo4j.metrics.MetricsSettings.csvPath; import static org.neo4j.metrics.MetricsTestHelper.metricsCsv; import static org.neo4j.metrics.MetricsTestHelper.readLongValue; import static org.neo4j.metrics.source.coreedge.EdgeMetrics.PULL_UPDATES; @@ -56,8 +63,18 @@ public class CoreEdgeMetricsIT { + private static final int TIMEOUT = 15; + @Rule - public final TargetDirectory.TestDirectory dir = TargetDirectory.testDirForTest( getClass() ); + public final ClusterRule clusterRule = new ClusterRule( getClass() ) + .withNumberOfCoreServers( 3 ) + .withNumberOfEdgeServers( 1 ) + .withSharedCoreParam( MetricsSettings.metricsEnabled, Settings.TRUE ) + .withSharedEdgeParam( MetricsSettings.metricsEnabled, Settings.TRUE ) + .withSharedCoreParam( MetricsSettings.csvEnabled, Settings.TRUE ) + .withSharedEdgeParam( MetricsSettings.csvEnabled, Settings.TRUE ) + .withSharedCoreParam( MetricsSettings.csvInterval, "100ms" ) + .withSharedEdgeParam( MetricsSettings.csvInterval, "100ms" ); private Cluster cluster; @@ -74,11 +91,10 @@ public void shutdown() throws ExecutionException, InterruptedException public void shouldMonitorCoreEdge() throws Exception { // given - File dbDir = dir.directory(); - cluster = Cluster.start( dbDir, 3, 1 ); + cluster = clusterRule.startCluster(); // when - GraphDatabaseService coreDB = cluster.awaitLeader( 5000 ); + CoreGraphDatabase coreDB = cluster.awaitLeader( 5000 ); try ( Transaction tx = coreDB.beginTx() ) { @@ -88,81 +104,71 @@ public void shouldMonitorCoreEdge() throws Exception } // then - for ( final CoreGraphDatabase db : cluster.coreServers() ) - { - assertAllNodesVisible( db ); - } - - for ( final EdgeGraphDatabase db : cluster.edgeServers() ) + for ( GraphDatabaseAPI db : concat( cluster.coreServers(), cluster.edgeServers() ) ) { assertAllNodesVisible( db ); } - File coreServerMetricsDir = new File( cluster.getCoreServerById( 0 ).getStoreDir(), "metrics" ); + File coreServerMetricsDir = new File( cluster.getCoreServerById( 0 ).getStoreDir(), csvPath.getDefaultValue() ); assertEventually( "append index eventually accurate", () -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.APPEND_INDEX ) ), - greaterThan( 0L ), 5, TimeUnit.SECONDS ); + greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS ); assertEventually( "commit index eventually accurate", () -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.COMMIT_INDEX ) ), - greaterThan( 0L ), 5, TimeUnit.SECONDS ); + greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS ); assertEventually( "term eventually accurate", () -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.TERM ) ), - greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS ); + greaterThanOrEqualTo( 0L ), TIMEOUT, TimeUnit.SECONDS ); assertEventually( "leader not found eventually accurate", () -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.LEADER_NOT_FOUND ) ), - equalTo( 0L ), 5, TimeUnit.SECONDS ); - - assertEventually( "tx pull requests received eventually accurate", - () -> - { - long total = 0; - for ( final CoreGraphDatabase db : cluster.coreServers() ) - { - File metricsDir = new File( db.getStoreDir(), "metrics" ); - total += readLongValue( metricsCsv( metricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) ); - } - return total; - }, - greaterThan( 0L ), 5, TimeUnit.SECONDS ); + equalTo( 0L ), TIMEOUT, TimeUnit.SECONDS ); + + assertEventually( "tx pull requests received eventually accurate", () -> { + long total = 0; + for ( final CoreGraphDatabase db : cluster.coreServers() ) + { + File metricsDir = new File( db.getStoreDir(), "metrics" ); + total += readLongValue( metricsCsv( metricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) ); + } + return total; + }, greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS ); assertEventually( "tx retries eventually accurate", - () -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.TX_RETRIES ) ), - equalTo( 0L ), 5, TimeUnit.SECONDS ); + () -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.TX_RETRIES ) ), equalTo( 0L ), + TIMEOUT, TimeUnit.SECONDS ); assertEventually( "is leader eventually accurate", () -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.IS_LEADER ) ), - greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS ); + greaterThanOrEqualTo( 0L ), TIMEOUT, TimeUnit.SECONDS ); File edgeServerMetricsDir = new File( cluster.getEdgeServerById( 0 ).getStoreDir(), "metrics" ); assertEventually( "pull update request registered", () -> readLongValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATES ) ), - greaterThan( 0L ), 5, TimeUnit.SECONDS ); + greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS ); assertEventually( "pull update request registered", - () -> - readLongValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATE_HIGHEST_TX_ID_REQUESTED ) ), - greaterThan( 0L ), 5, TimeUnit.SECONDS ); + () -> readLongValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATE_HIGHEST_TX_ID_REQUESTED ) ), + greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS ); assertEventually( "pull update response received", - () -> - readLongValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATE_HIGHEST_TX_ID_RECEIVED ) ), - greaterThan( 0L ), 5, TimeUnit.SECONDS ); + () -> readLongValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATE_HIGHEST_TX_ID_RECEIVED ) ), + greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS ); assertEventually( "dropped messages eventually accurate", () -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.DROPPED_MESSAGES ) ), - greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS ); + greaterThanOrEqualTo( 0L ), TIMEOUT, TimeUnit.SECONDS ); assertEventually( "queue size eventually accurate", () -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.QUEUE_SIZE ) ), - greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS ); + greaterThanOrEqualTo( 0L ), TIMEOUT, TimeUnit.SECONDS ); } - private void assertAllNodesVisible( GraphDatabaseFacade db ) throws Exception + private void assertAllNodesVisible( GraphDatabaseAPI db ) throws Exception { try ( Transaction tx = db.beginTx() ) { @@ -171,7 +177,7 @@ private void assertAllNodesVisible( GraphDatabaseFacade db ) throws Exception Config config = db.getDependencyResolver().resolveDependency( Config.class ); assertEventually( "node to appear on core server " + config.get( raft_advertised_address ), nodeCount, - greaterThan( 0L ), 15, SECONDS ); + greaterThan( 0L ), TIMEOUT, SECONDS ); for ( Node node : db.getAllNodes() ) {