diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java index 4a5e504cf320b..63c1703c67f51 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java @@ -51,12 +51,14 @@ import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; public class CatchupServer extends LifecycleAdapter { private final LogProvider logProvider; + private Monitors monitors; private final Supplier storeIdSupplier; private final Supplier transactionIdStoreSupplier; @@ -77,13 +79,14 @@ public CatchupServer( LogProvider logProvider, Supplier logicalTransactionStoreSupplier, Supplier dataSourceSupplier, Supplier checkPointerSupplier, - ListenSocketAddress listenAddress ) + ListenSocketAddress listenAddress, Monitors monitors ) { this.listenAddress = listenAddress; this.transactionIdStoreSupplier = transactionIdStoreSupplier; this.storeIdSupplier = storeIdSupplier; this.logicalTransactionStoreSupplier = logicalTransactionStoreSupplier; this.logProvider = logProvider; + this.monitors = monitors; this.log = logProvider.getLog( getClass() ); this.dataSourceSupplier = dataSourceSupplier; this.checkPointerSupplier = checkPointerSupplier; @@ -125,7 +128,8 @@ protected void initChannel( SocketChannel ch ) throws Exception pipeline.addLast( new TxPullRequestDecoder( protocol ) ); pipeline.addLast( new TxPullRequestHandler( protocol, storeIdSupplier, - transactionIdStoreSupplier, logicalTransactionStoreSupplier ) ); + transactionIdStoreSupplier, logicalTransactionStoreSupplier, + monitors) ); pipeline.addLast( new ChunkedWriteHandler() ); pipeline.addLast( new GetStoreRequestDecoder( protocol ) ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/core/TxPullRequestHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/core/TxPullRequestHandler.java index 7ab3b7c5442d4..609b1c8df9120 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/core/TxPullRequestHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/core/TxPullRequestHandler.java @@ -34,6 +34,7 @@ import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; +import org.neo4j.kernel.monitoring.Monitors; public class TxPullRequestHandler extends SimpleChannelInboundHandler { @@ -41,16 +42,19 @@ public class TxPullRequestHandler extends SimpleChannelInboundHandler storeIdSupplier, Supplier transactionIdStoreSupplier, - Supplier logicalTransactionStoreSupplier ) + Supplier logicalTransactionStoreSupplier, + Monitors monitors ) { this.protocol = protocol; this.storeId = storeIdSupplier.get(); this.transactionIdStore = transactionIdStoreSupplier.get(); this.logicalTransactionStore = logicalTransactionStoreSupplier.get(); + this.monitor = monitors.newMonitor( TxPullRequestsMonitor.class ); } @Override @@ -79,6 +83,7 @@ protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg ctx.write( new TxStreamFinishedResponse( endTxId ) ); ctx.flush(); + monitor.increment(); protocol.expect( NextMessage.MESSAGE_TYPE ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/core/TxPullRequestsMonitor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/core/TxPullRequestsMonitor.java new file mode 100644 index 0000000000000..339ed57bc8b71 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/core/TxPullRequestsMonitor.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.catchup.tx.core; + +public interface TxPullRequestsMonitor +{ + long txPullRequestsReceived(); + void increment(); +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/core/TxRetryMonitor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/core/TxRetryMonitor.java new file mode 100644 index 0000000000000..cabac380cdbff --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/core/TxRetryMonitor.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.catchup.tx.core; + +public interface TxRetryMonitor +{ + long transactionsRetries(); + void retry(); +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/CoreEdgeMetaData.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/CoreEdgeMetaData.java new file mode 100644 index 0000000000000..8c270f4deacff --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/CoreEdgeMetaData.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft; + +public interface CoreEdgeMetaData +{ + boolean isLeader(); +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/CoreMetaData.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/CoreMetaData.java new file mode 100644 index 0000000000000..0bd15ff7becea --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/CoreMetaData.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft; + +public interface CoreMetaData +{ + boolean isLeader(); +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/LeaderNotFoundMonitor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/LeaderNotFoundMonitor.java new file mode 100644 index 0000000000000..d551cfe9b9de4 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/LeaderNotFoundMonitor.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft; + +public interface LeaderNotFoundMonitor +{ + long leaderNotFoundExceptions(); + void increment(); +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index 198b1a2dfefeb..8d9732c7bf078 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -40,6 +40,7 @@ import org.neo4j.coreedge.raft.state.vote.VoteState; import org.neo4j.helpers.Clock; import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -68,8 +69,11 @@ * * @param The membership type. */ -public class RaftInstance implements LeaderLocator, Inbound.MessageHandler +public class RaftInstance implements LeaderLocator, Inbound.MessageHandler, CoreMetaData { + + private final LeaderNotFoundMonitor leaderNotFoundMonitor; + public enum Timeouts implements RenewableTimeoutService.TimeoutName { ELECTION, HEARTBEAT @@ -103,7 +107,7 @@ public RaftInstance( MEMBER myself, TermState termState, VoteState voteS LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping, Supplier databaseHealthSupplier, - Clock clock ) + Clock clock, Monitors monitors ) { this.myself = myself; @@ -124,6 +128,8 @@ public RaftInstance( MEMBER myself, TermState termState, VoteState voteS this.state = new RaftState<>( myself, termState, membershipManager, entryLog, voteState ); + leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class ); + initTimers(); inbound.registerHandler( this ); @@ -191,6 +197,7 @@ public MEMBER getLeader() throws NoLeaderTimeoutException if ( state.leader() == null ) { + leaderNotFoundMonitor.increment(); throw new NoLeaderTimeoutException(); } @@ -270,6 +277,7 @@ public synchronized void handle( Serializable incomingMessage ) } } + @Override public boolean isLeader() { return currentRole == LEADER; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcess.java index 7566a07fd09ef..9e1c4b71d7210 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcess.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcess.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.neo4j.coreedge.catchup.tx.core.TxRetryMonitor; import org.neo4j.coreedge.raft.replication.Replicator; import org.neo4j.coreedge.raft.replication.Replicator.ReplicationFailedException; import org.neo4j.coreedge.raft.replication.session.LocalSessionPool; @@ -33,6 +34,7 @@ import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.storageengine.api.TransactionApplicationMode; @@ -47,11 +49,12 @@ public class ReplicatedTransactionCommitProcess extends LifecycleAdapter impleme private final LocalSessionPool sessionPool; private final Log log; private final CommittingTransactions txFutures; + private final TxRetryMonitor txRetryMonitor; public ReplicatedTransactionCommitProcess( Replicator replicator, LocalSessionPool sessionPool, Replicator.ReplicatedContentListener replicatedTxListener, long retryIntervalMillis, LogService logging, - CommittingTransactions txFutures ) + CommittingTransactions txFutures, Monitors monitors) { this.sessionPool = sessionPool; this.replicatedTxListener = replicatedTxListener; @@ -59,6 +62,7 @@ public ReplicatedTransactionCommitProcess( Replicator replicator, LocalSessionPo this.retryIntervalMillis = retryIntervalMillis; this.log = logging.getInternalLog( getClass() ); this.txFutures = txFutures; + txRetryMonitor = monitors.newMonitor( TxRetryMonitor.class ); replicator.subscribe( this.replicatedTxListener ); } @@ -99,6 +103,7 @@ public long commit( final TransactionToApply tx, } log.warn( "Transaction replication failed, but a previous attempt may have succeeded," + "so commit process must keep waiting for possible success.", e ); + txRetryMonitor.retry(); } try @@ -112,11 +117,13 @@ public long commit( final TransactionToApply tx, { interrupted = true; log.info( "Replication of %s was interrupted; retrying.", operationContext ); + txRetryMonitor.retry(); } catch ( TimeoutException e ) { log.info( "Replication of %s timed out after %d %s; retrying.", operationContext, retryIntervalMillis, TimeUnit.MILLISECONDS ); + txRetryMonitor.retry(); } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index c06587da1dc7a..00e0a485eef3a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -98,6 +98,7 @@ import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.Version; import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.ha.cluster.member.ClusterMembers; import org.neo4j.kernel.impl.api.CommitProcessFactory; import org.neo4j.kernel.impl.api.SchemaWriteGuard; import org.neo4j.kernel.impl.api.TransactionHeaderInformation; @@ -120,6 +121,7 @@ import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.LifecycleStatus; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; import org.neo4j.udc.UsageData; @@ -230,7 +232,11 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, monitoredRaftLog, termState, voteState, myself, logProvider, raftServer, raftTimeoutService, - databaseHealthSupplier, raftMembershipState ); + databaseHealthSupplier, raftMembershipState, platformModule.monitors ); + + dependencies.satisfyDependency( raft ); + + dependencies.satisfyDependency( raft ); RaftReplicator replicator = new RaftReplicator<>( raft, myself, new RaftOutbound( loggingOutbound ) ); @@ -241,7 +247,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, new ReplicatedLockTokenStateMachine<>( replicator ); commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool, replicatedLockTokenStateMachine, - dependencies, logging ); + dependencies, logging, platformModule.monitors ); final IdAllocationState idAllocationState; try @@ -322,7 +328,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), new DataSourceSupplier( platformModule ), new CheckpointerSupplier( platformModule.dependencies ), - config.get( CoreEdgeClusterSettings.transaction_listen_address ) ); + config.get( CoreEdgeClusterSettings.transaction_listen_address ), + platformModule.monitors); life.add( CoreServerStartupProcess.createLifeSupport( platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft, new RaftLogReplay( monitoredRaftLog, @@ -361,7 +368,7 @@ public static CommitProcessFactory createCommitProcessFactory( final Replicator final LockTokenManager currentReplicatedLockState, final Dependencies dependencies, - final LogService logging ) + final LogService logging, Monitors monitors ) { return ( appender, applier, config ) -> { TransactionRepresentationCommitProcess localCommit = @@ -380,7 +387,7 @@ public static CommitProcessFactory createCommitProcessFactory( final Replicator return new ReplicatedTransactionCommitProcess( replicator, localSessionPool, replicatedTxStateMachine, config.get( CoreEdgeClusterSettings.tx_replication_retry_interval ), - logging, committingTransactions + logging, committingTransactions, monitors ); }; } @@ -398,7 +405,8 @@ private static RaftInstance createRaft( LifeSupport life, RaftServer raftServer, DelayedRenewableTimeoutService raftTimeoutService, Supplier databaseHealthSupplier, - RaftMembershipState raftMembershipState ) + RaftMembershipState raftMembershipState, Monitors + monitors ) { LoggingInbound loggingRaftInbound = new LoggingInbound( raftServer, messageLogger, myself.getRaftAddress() ); @@ -427,7 +435,7 @@ private static RaftInstance createRaft( LifeSupport life, myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval, raftTimeoutService, loggingRaftInbound, new RaftOutbound( outbound ), leaderWaitTimeout, logProvider, - raftMembershipManager, logShipping, databaseHealthSupplier, Clock.SYSTEM_CLOCK ); + raftMembershipManager, logShipping, databaseHealthSupplier, Clock.SYSTEM_CLOCK, monitors ); life.add( new RaftDiscoveryServiceConnector( discoveryService, raftInstance ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java index 18faa03889a26..b36341c9d1fe4 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java @@ -36,6 +36,7 @@ import org.neo4j.coreedge.raft.state.vote.VoteState; import org.neo4j.helpers.Clock; import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLogProvider; @@ -69,6 +70,7 @@ public class RaftInstanceBuilder private int maxAllowedShippingLag = 256; private Supplier databaseHealthSupplier; private InMemoryRaftMembershipState raftMembership = new InMemoryRaftMembershipState<>(); + private Monitors monitors = new Monitors(); public RaftInstanceBuilder( MEMBER member, int expectedClusterSize, RaftGroup.Builder memberSetBuilder ) { @@ -89,7 +91,7 @@ public RaftInstance build() return new RaftInstance<>( member, termState, voteState, raftLog, electionTimeout, heartbeatInterval, renewableTimeoutService, inbound, outbound, leaderWaitTimeout, logProvider, membershipManager, - logShipping, databaseHealthSupplier, clock ); + logShipping, databaseHealthSupplier, clock, monitors ); } public RaftInstanceBuilder leaderWaitTimeout( long leaderWaitTimeout ) @@ -133,4 +135,10 @@ public RaftInstanceBuilder clock( Clock clock ) this.clock = clock; return this; } + + public RaftInstanceBuilder monitors( Monitors monitors ) + { + this.monitors = monitors; + return this; + } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java index 2de4dc7cb6756..9daa13fecdbf1 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java @@ -38,6 +38,7 @@ import org.neo4j.kernel.KernelEventHandlers; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLog; import static org.hamcrest.CoreMatchers.equalTo; @@ -458,6 +459,44 @@ public void shouldPanicWhenFailingToHandleMessageUnderNormalConditions() throws assertTrue( databaseHealth.hasPanicked() ); } + @Test + public void shouldMonitorLeaderNotFound() throws Exception + { + // Given + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + + int leaderWaitTimeout = 10000; + Clock clock = new TickingClock( 0, leaderWaitTimeout + 1, TimeUnit.MILLISECONDS ); + + Monitors monitors = new Monitors(); + LeaderNotFoundMonitor leaderNotFoundMonitor = new StubLeaderNotFoundMonitor(); + monitors.addMonitorListener( leaderNotFoundMonitor ); + + + RaftInstance raft = new RaftInstanceBuilder<>( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) + .timeoutService( timeouts ) + .clock( clock ) + .leaderWaitTimeout( leaderWaitTimeout ) + .monitors(monitors) + .build(); + + raft.bootstrapWithInitialMembers( new RaftTestGroup( asSet( myself, member1, member2 ) ) ); // @logIndex=0 + + try + { + // When + // There is no leader + raft.getLeader(); + fail( "Should have thrown exception" ); + } + // Then + catch ( NoLeaderTimeoutException e ) + { + // expected + assertEquals(1, leaderNotFoundMonitor.leaderNotFoundExceptions()); + } + } + private static class ExplodingRaftLog implements RaftLog { private boolean startExploding = false; @@ -564,4 +603,21 @@ public boolean hasPanicked() return hasPanicked; } } + + private class StubLeaderNotFoundMonitor implements LeaderNotFoundMonitor + { + long count = 0; + + @Override + public long leaderNotFoundExceptions() + { + return count; + } + + @Override + public void increment() + { + count++; + } + } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java index 8a4b5355d61d1..6604eb3df5a25 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java @@ -19,6 +19,17 @@ */ package org.neo4j.coreedge.raft.log; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Test; +import org.mockito.Matchers; + +import org.neo4j.coreedge.raft.ReplicatedInteger; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.fs.StoreFileChannel; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -31,23 +42,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.junit.Test; -import org.mockito.Matchers; -import org.neo4j.coreedge.raft.ReplicatedInteger; -import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor; -import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; -import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; -import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.io.fs.StoreFileChannel; -import org.neo4j.kernel.monitoring.Monitors; - -import static org.neo4j.coreedge.raft.log.RaftLog.APPEND_INDEX_TAG; -import static org.neo4j.coreedge.raft.log.RaftLog.COMMIT_INDEX_TAG; - public class NaiveDurableRaftLogTest { @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/CommitProcessStateMachineCollaborationTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/CommitProcessStateMachineCollaborationTest.java index 045814765eee8..1265bce1b319d 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/CommitProcessStateMachineCollaborationTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/CommitProcessStateMachineCollaborationTest.java @@ -38,6 +38,7 @@ import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; +import org.neo4j.kernel.monitoring.Monitors; import static junit.framework.TestCase.fail; import static org.junit.Assert.assertEquals; @@ -71,7 +72,7 @@ public void shouldAlwaysCompleteFutureEvenIfReplicationHappensAtUnfortunateMomen ReplicatedTransactionCommitProcess commitProcess = new ReplicatedTransactionCommitProcess( replicator, sessionPool, stateMachine, 100, - NullLogService.getInstance(), txFutures ); + NullLogService.getInstance(), txFutures, new Monitors() ); // when commitProcess.commit( tx(), NULL, INTERNAL ); @@ -100,7 +101,7 @@ public void shouldFailTransactionIfLockSessionChanges() throws Exception ReplicatedTransactionCommitProcess commitProcess = new ReplicatedTransactionCommitProcess( replicator, sessionPool, stateMachine, 1, - NullLogService.getInstance(), txFutures ); + NullLogService.getInstance(), txFutures, new Monitors() ); // when try diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcessTest.java index 805b94a769edd..c136502687a96 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcessTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcessTest.java @@ -38,6 +38,7 @@ import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; +import org.neo4j.kernel.monitoring.Monitors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -75,7 +76,7 @@ public void shouldReplicateOnlyOnceIfFirstAttemptSuccessful() throws Exception // when new ReplicatedTransactionCommitProcess( replicator, new LocalSessionPool( coreMember ), - transactionStateMachine, 1, NullLogService.getInstance(), txFutures ) + transactionStateMachine, 1, NullLogService.getInstance(), txFutures, new Monitors() ) .commit( tx(), NULL, INTERNAL ); // then @@ -100,7 +101,7 @@ public void shouldRetryReplicationIfFirstAttemptTimesOut() throws Exception // when new ReplicatedTransactionCommitProcess( replicator, new LocalSessionPool( coreMember ), - transactionStateMachine, 1, NullLogService.getInstance(), txFutures ) + transactionStateMachine, 1, NullLogService.getInstance(), txFutures, new Monitors() ) .commit( tx(), NULL, INTERNAL ); // then @@ -114,7 +115,9 @@ public void shouldAbortIfFirstReplicationAttemptFails() throws Exception Replicator replicator = mock( Replicator.class ); doThrow( Replicator.ReplicationFailedException.class ).when( replicator ).replicate( any( ReplicatedContent.class) ); - TransactionCommitProcess commitProcess = new ReplicatedTransactionCommitProcess( replicator, new LocalSessionPool( coreMember ), mock( Replicator.ReplicatedContentListener.class), 0, NullLogService.getInstance(), mock(CommittingTransactions.class) ); + TransactionCommitProcess commitProcess = new ReplicatedTransactionCommitProcess( replicator, + new LocalSessionPool( coreMember ), mock( Replicator.ReplicatedContentListener.class), + 0, NullLogService.getInstance(), mock(CommittingTransactions.class), new Monitors() ); // when try diff --git a/enterprise/metrics/pom.xml b/enterprise/metrics/pom.xml index d64b283e59ff0..262b049ce9b3d 100644 --- a/enterprise/metrics/pom.xml +++ b/enterprise/metrics/pom.xml @@ -63,7 +63,7 @@ - + diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/CoreEdgeMetrics.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/CoreMetrics.java similarity index 56% rename from enterprise/metrics/src/main/java/org/neo4j/metrics/source/CoreEdgeMetrics.java rename to enterprise/metrics/src/main/java/org/neo4j/metrics/source/CoreMetrics.java index 63fcb1b6722f0..9e02d0c801bbd 100644 --- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/CoreEdgeMetrics.java +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/CoreMetrics.java @@ -20,23 +20,20 @@ package org.neo4j.metrics.source; import java.io.IOException; +import java.util.function.Supplier; import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; -import org.neo4j.coreedge.raft.log.NaiveDurableRaftLog; +import org.neo4j.coreedge.raft.CoreMetaData; import org.neo4j.kernel.impl.annotations.Documented; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.monitoring.Monitors; import static com.codahale.metrics.MetricRegistry.name; -import static org.neo4j.coreedge.raft.log.RaftLog.APPEND_INDEX_TAG; -import static org.neo4j.coreedge.raft.log.RaftLog.COMMIT_INDEX_TAG; -import static org.neo4j.coreedge.raft.state.term.TermState.TERM_TAG; - @Documented(".Core Edge Metrics") -public class CoreEdgeMetrics extends LifecycleAdapter +public class CoreMetrics extends LifecycleAdapter { private static final String CORE_EDGE_PREFIX = "neo4j.core_edge"; @@ -46,18 +43,31 @@ public class CoreEdgeMetrics extends LifecycleAdapter public static final String COMMIT_INDEX = name( CORE_EDGE_PREFIX, "commit_index" ); @Documented("RAFT Term of this server") public static final String TERM = name( CORE_EDGE_PREFIX, "term" ); + @Documented("Leader was not found while attempting to commit a transaction") + public static final String LEADER_NOT_FOUND = name( CORE_EDGE_PREFIX, "leader_not_found" ); + @Documented("TX pull requests received from edge servers") + public static final String TX_PULL_REQUESTS_RECEIVED = name( CORE_EDGE_PREFIX, "tx_pull_requests_received" ); + @Documented("Transaction retries") + public static final String TX_RETRIES = name( CORE_EDGE_PREFIX, "tx_retries" ); + @Documented("Is this server the leader?") + public static final String IS_LEADER = name( CORE_EDGE_PREFIX, "is_leader" ); private Monitors monitors; private MetricRegistry registry; + private Supplier coreMetaData; private final RaftLogCommitIndexMetric raftLogCommitIndexMetric = new RaftLogCommitIndexMetric(); private final RaftLogAppendIndexMetric raftLogAppendIndexMetric = new RaftLogAppendIndexMetric(); private final RaftTermMetric raftTermMetric = new RaftTermMetric(); + private final LeaderNotFoundMetric leaderNotFoundMetric = new LeaderNotFoundMetric(); + private final TxPullRequestsMetric txPullRequestsMetric = new TxPullRequestsMetric(); + private final TxRetryMetric txRetryMetric = new TxRetryMetric(); - public CoreEdgeMetrics( Monitors monitors, MetricRegistry registry ) + public CoreMetrics( Monitors monitors, MetricRegistry registry, Supplier coreMetaData ) { this.monitors = monitors; this.registry = registry; + this.coreMetaData = coreMetaData; } @Override @@ -66,10 +76,17 @@ public void start() throws Throwable monitors.addMonitorListener( raftLogCommitIndexMetric ); monitors.addMonitorListener( raftLogAppendIndexMetric ); monitors.addMonitorListener( raftTermMetric ); + monitors.addMonitorListener( leaderNotFoundMetric ); + monitors.addMonitorListener( txPullRequestsMetric ); + monitors.addMonitorListener( txRetryMetric ); registry.register( COMMIT_INDEX, (Gauge) raftLogCommitIndexMetric::commitIndex ); registry.register( APPEND_INDEX, (Gauge) raftLogAppendIndexMetric::appendIndex ); registry.register( TERM, (Gauge) raftTermMetric::term ); + registry.register( LEADER_NOT_FOUND, (Gauge) leaderNotFoundMetric::leaderNotFoundExceptions ); + registry.register( TX_PULL_REQUESTS_RECEIVED, (Gauge) txPullRequestsMetric::txPullRequestsReceived ); + registry.register( TX_RETRIES, (Gauge) txRetryMetric::transactionsRetries ); + registry.register( IS_LEADER, new LeaderGauge() ); } @Override @@ -78,9 +95,25 @@ public void stop() throws IOException registry.remove( COMMIT_INDEX ); registry.remove( APPEND_INDEX ); registry.remove( TERM ); + registry.remove( LEADER_NOT_FOUND ); + registry.remove( TX_PULL_REQUESTS_RECEIVED ); + registry.remove( TX_RETRIES ); + registry.remove( IS_LEADER ); monitors.removeMonitorListener( raftLogCommitIndexMetric ); monitors.removeMonitorListener( raftLogAppendIndexMetric ); monitors.removeMonitorListener( raftTermMetric ); + monitors.removeMonitorListener( leaderNotFoundMetric ); + monitors.removeMonitorListener( txPullRequestsMetric ); + monitors.removeMonitorListener( txRetryMetric ); + } + + private class LeaderGauge implements Gauge + { + @Override + public Integer getValue() + { + return coreMetaData.get().isLeader() ? 1 : 0; + } } } diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/LeaderNotFoundMetric.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/LeaderNotFoundMetric.java new file mode 100644 index 0000000000000..99dd2efb6f5e3 --- /dev/null +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/LeaderNotFoundMetric.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.metrics.source; + +import java.util.concurrent.atomic.AtomicLong; + +import org.neo4j.coreedge.raft.LeaderNotFoundMonitor; + + +public class LeaderNotFoundMetric implements LeaderNotFoundMonitor +{ + private AtomicLong count = new AtomicLong( 0 ); + + @Override + public long leaderNotFoundExceptions() + { + return count.get(); + } + + @Override + public void increment() + { + count.incrementAndGet(); + } +} diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java index 00bd9862e8882..19ba2a21beb5b 100644 --- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/Neo4jMetricsBuilder.java @@ -19,10 +19,12 @@ */ package org.neo4j.metrics.source; +import java.util.function.Supplier; + import com.codahale.metrics.MetricRegistry; -import java.util.function.Supplier; +import org.neo4j.coreedge.raft.CoreMetaData; import org.neo4j.io.pagecache.monitoring.PageCacheCounters; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.ha.cluster.member.ClusterMembers; @@ -78,6 +80,8 @@ public interface Dependencies StoreEntityCounters entityCountStats(); Supplier clusterMembers(); + + Supplier raft(); } public Neo4jMetricsBuilder( MetricRegistry registry, EventReporter reporter, Config config, LogService logService, @@ -180,8 +184,17 @@ public boolean build() if ( config.get( MetricsSettings.coreEdgeEnabled ) ) { - life.add( new CoreEdgeMetrics( dependencies.monitors(), registry ) ); - result = true; + OperationalMode mode = kernelContext.databaseInfo().operationalMode; + if ( mode == OperationalMode.core) + { + life.add( new CoreMetrics( dependencies.monitors(), registry, dependencies.raft() ) ); + result = true; + } + else + { + logService.getUserLog( getClass() ) + .warn( "Core Edge metrics was enabled but the graph database is not in Core/Edge mode." ); + } } return result; diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/RaftLogCommitIndexMetric.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/RaftLogCommitIndexMetric.java index 986f146df6157..140c07b54066b 100644 --- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/RaftLogCommitIndexMetric.java +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/RaftLogCommitIndexMetric.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong; +import org.neo4j.coreedge.raft.LeaderNotFoundMonitor; import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; public class RaftLogCommitIndexMetric implements RaftLogCommitIndexMonitor diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/TxPullRequestsMetric.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/TxPullRequestsMetric.java new file mode 100644 index 0000000000000..a361989ede215 --- /dev/null +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/TxPullRequestsMetric.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.metrics.source; + +import java.util.concurrent.atomic.AtomicLong; + +import org.neo4j.coreedge.catchup.tx.core.TxPullRequestsMonitor; +import org.neo4j.coreedge.raft.LeaderNotFoundMonitor; + + +public class TxPullRequestsMetric implements TxPullRequestsMonitor +{ + private AtomicLong count = new AtomicLong( 0 ); + + @Override + public long txPullRequestsReceived() + { + return count.get(); + } + + @Override + public void increment() + { + count.incrementAndGet(); + } +} diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/TxRetryMetric.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/TxRetryMetric.java new file mode 100644 index 0000000000000..0685a0817e975 --- /dev/null +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/TxRetryMetric.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.metrics.source; + +import java.util.concurrent.atomic.AtomicLong; + +import org.neo4j.coreedge.catchup.tx.core.TxPullRequestsMonitor; +import org.neo4j.coreedge.catchup.tx.core.TxRetryMonitor; + + +public class TxRetryMetric implements TxRetryMonitor +{ + private AtomicLong count = new AtomicLong( 0 ); + + @Override + public long transactionsRetries() + { + return count.get(); + } + + @Override + public void retry() + { + count.incrementAndGet(); + } +} diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java index 8d696c8d1c0c2..6d36d94f21e3a 100644 --- a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java +++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java @@ -28,19 +28,23 @@ import org.junit.Test; import org.neo4j.coreedge.server.core.CoreGraphDatabase; +import org.neo4j.coreedge.server.edge.EdgeGraphDatabase; import org.neo4j.function.ThrowingSupplier; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Transaction; import org.neo4j.kernel.configuration.Config; -import org.neo4j.metrics.source.CoreEdgeMetrics; +import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; +import org.neo4j.metrics.source.CoreMetrics; import org.neo4j.test.TargetDirectory; import org.neo4j.tooling.GlobalGraphOperations; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -58,11 +62,11 @@ public class CoreEdgeMetricsIT public final TargetDirectory.TestDirectory dir = TargetDirectory.testDirForTest( getClass() ); @Test - public void shouldReplicateTransactionToCoreServers() throws Exception + public void shouldMonitorCoreEdge() throws Exception { // given File dbDir = dir.directory(); - Cluster cluster = Cluster.start( dbDir, 3, 0 ); + Cluster cluster = Cluster.start( dbDir, 3, 1 ); // when GraphDatabaseService coreDB = cluster.findLeader( 5000 ); @@ -77,36 +81,58 @@ public void shouldReplicateTransactionToCoreServers() throws Exception // then for ( final CoreGraphDatabase db : cluster.coreServers() ) { - try ( Transaction tx = db.beginTx() ) - { - ThrowingSupplier nodeCount = () -> count( db.getAllNodes() ); - - Config config = db.getDependencyResolver().resolveDependency( Config.class ); - - assertEventually( "node to appear on core server " + config.get( raft_advertised_address ), nodeCount, - greaterThan( 0L ), 15, SECONDS ); - - for ( Node node : GlobalGraphOperations.at( db ).getAllNodes() ) - { - assertEquals( "baz_bat", node.getProperty( "foobar" ) ); - } + assertAllNodesVisible(db); + } - tx.success(); - } + for ( final EdgeGraphDatabase db : cluster.edgeServers() ) + { + assertAllNodesVisible( db ); } - File appendMetrics = metricsCsv( dbDir, CoreEdgeMetrics.APPEND_INDEX ); + File appendMetrics = metricsCsv( dbDir, CoreMetrics.APPEND_INDEX ); assertThat( readLastValue( appendMetrics ), greaterThan( 0L ) ); - File commitMetrics = metricsCsv( dbDir, CoreEdgeMetrics.COMMIT_INDEX ); + File commitMetrics = metricsCsv( dbDir, CoreMetrics.COMMIT_INDEX ); assertThat( readLastValue( commitMetrics ), greaterThan( 0L ) ); - File termMetrics = metricsCsv( dbDir, CoreEdgeMetrics.TERM ); + File termMetrics = metricsCsv( dbDir, CoreMetrics.TERM ); assertThat( readLastValue( termMetrics ), greaterThan( 0L ) ); + File leaderNotFoundMetrics = metricsCsv( dbDir, CoreMetrics.LEADER_NOT_FOUND ); + assertThat( readLastValue( leaderNotFoundMetrics ), equalTo( 0L ) ); + + File txPullRequestsMetrics = metricsCsv( dbDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ); + assertThat( readLastValue( txPullRequestsMetrics ), greaterThan( 0L ) ); + + File txRetryMetrics = metricsCsv( dbDir, CoreMetrics.TX_RETRIES ); + assertThat( readLastValue( txRetryMetrics ), equalTo( 0L ) ); + + File isLeaderMetrics = metricsCsv( dbDir, CoreMetrics.IS_LEADER ); + assertThat( readLastValue( isLeaderMetrics ), greaterThanOrEqualTo ( 0L ) ); + cluster.shutdown(); } + private void assertAllNodesVisible( GraphDatabaseFacade db ) throws Exception + { + try ( Transaction tx = db.beginTx() ) + { + ThrowingSupplier nodeCount = () -> count( db.getAllNodes() ); + + Config config = db.getDependencyResolver().resolveDependency( Config.class ); + + assertEventually( "node to appear on core server " + config.get( raft_advertised_address ), nodeCount, + greaterThan( 0L ), 15, SECONDS ); + + for ( Node node : GlobalGraphOperations.at( db ).getAllNodes() ) + { + assertEquals( "baz_bat", node.getProperty( "foobar" ) ); + } + + tx.success(); + } + } + private File metricsCsv( File dbDir, String metric ) { File csvFile = new File( dbDir, "/server-core-0/metrics/" + metric + ".csv" );