From a095933c7cd775ce45f268cd07cdb52988a05f06 Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Fri, 15 Jan 2016 11:10:05 +0000 Subject: [PATCH] Rewrite the core edge monitoring code as decorators * Added integration test to check CSV files correctly populated * Added unit tests for monitoring wrapper * Removed initial integration test from MetricsKernelExtensionFactoryIT as it didn't fit in there. --- .../coreedge/raft/log/MonitoredRaftLog.java | 109 ++++++++++++++ .../raft/log/NaiveDurableRaftLog.java | 16 +-- .../coreedge/raft/log/debug/DumpRaftLog.java | 2 +- .../raft/state/term/MonitoredTermState.java | 51 +++++++ .../core/EnterpriseCoreEditionModule.java | 15 +- .../org/neo4j/coreedge/discovery/Cluster.java | 9 +- .../raft/log/MonitoredRaftLogTest.java | 101 +++++++++++++ .../log/NaiveDurableRaftLogContractTest.java | 2 +- .../raft/log/NaiveDurableRaftLogTest.java | 15 +- .../raft/log/RaftLogAdversarialTest.java | 2 +- .../raft/log/RaftLogDurabilityTest.java | 2 +- .../raft/log/debug/ReplayRaftLog.java | 2 +- .../state/term/MonitoredTermStateTest.java | 82 +++++++++++ .../scenarios/EdgeServerReplicationIT.java | 2 +- enterprise/metrics/pom.xml | 7 + .../org/neo4j/metrics/CoreEdgeMetricsIT.java | 136 ++++++++++++++++++ .../MetricsKernelExtensionFactoryIT.java | 41 ------ 17 files changed, 521 insertions(+), 73 deletions(-) create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/term/MonitoredTermState.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/MonitoredRaftLogTest.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/term/MonitoredTermStateTest.java create mode 100644 enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java new file mode 100644 index 0000000000000..406c277854785 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor; +import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; +import org.neo4j.coreedge.raft.replication.ReplicatedContent; +import org.neo4j.kernel.monitoring.Monitors; + +public class MonitoredRaftLog implements RaftLog +{ + private final RaftLog delegate; + private final RaftLogAppendIndexMonitor appendIndexMonitor; + private final RaftLogCommitIndexMonitor commitIndexMonitor; + + public MonitoredRaftLog( RaftLog delegate, Monitors monitors ) + { + this.delegate = delegate; + this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass(), APPEND_INDEX_TAG ); + this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass(), COMMIT_INDEX_TAG ); + } + + @Override + public long append( RaftLogEntry entry ) throws RaftStorageException + { + long appendIndex = delegate.append( entry ); + appendIndexMonitor.appendIndex( appendIndex ); + return appendIndex; + } + + @Override + public void truncate( long fromIndex ) throws RaftStorageException + { + delegate.truncate( fromIndex ); + appendIndexMonitor.appendIndex( delegate.appendIndex() ); + } + + @Override + public void commit( long commitIndex ) throws RaftStorageException + { + delegate.commit( commitIndex ); + commitIndexMonitor.commitIndex( delegate.commitIndex() ); + } + + @Override + public void replay() throws Throwable + { + delegate.replay(); + } + + @Override + public void registerListener( Listener consumer ) + { + delegate.registerListener( consumer ); + } + + @Override + public long appendIndex() + { + return delegate.appendIndex(); + } + + @Override + public long commitIndex() + { + return delegate.commitIndex(); + } + + @Override + public RaftLogEntry readLogEntry( long logIndex ) throws RaftStorageException + { + return delegate.readLogEntry( logIndex ); + } + + @Override + public ReplicatedContent readEntryContent( long logIndex ) throws RaftStorageException + { + return delegate.readEntryContent( logIndex ); + } + + @Override + public long readEntryTerm( long logIndex ) throws RaftStorageException + { + return delegate.readEntryTerm( logIndex ); + } + + @Override + public boolean entryExists( long logIndex ) + { + return delegate.entryExists( logIndex ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java index 90ed53af7d67e..225ebcee779d3 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java @@ -35,6 +35,9 @@ import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.monitoring.Monitors; +import static org.neo4j.coreedge.raft.log.RaftLog.*; + + /** * Writes a raft log to disk using 3 files: *

@@ -82,17 +85,9 @@ public class NaiveDurableRaftLog extends LifecycleAdapter implements RaftLog private long commitIndex = -1; private long term = -1; - private final RaftLogAppendIndexMonitor appendIndexMonitor; - private final RaftLogCommitIndexMonitor commitIndexMonitor; - - public NaiveDurableRaftLog( FileSystemAbstraction fileSystem, File directory, Serializer serializer, - Monitors monitors ) + public NaiveDurableRaftLog( FileSystemAbstraction fileSystem, File directory, Serializer serializer) { this.serializer = serializer; - this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass(), RaftLog - .APPEND_INDEX_TAG ); - this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass(), RaftLog - .COMMIT_INDEX_TAG ); directory.mkdirs(); @@ -198,7 +193,6 @@ public long append( RaftLogEntry logEntry ) throws RaftStorageException writeEntry( new Entry( logEntry.term(), contentOffset ) ); contentOffset += length; appendIndex++; - appendIndexMonitor.appendIndex(appendIndex); for ( Listener listener : listeners ) { listener.onAppended( logEntry.content(), appendIndex ); @@ -262,7 +256,6 @@ public void commit( final long newCommitIndex ) throws RaftStorageException try { storeCommitIndex( actualNewCommitIndex ); - commitIndexMonitor.commitIndex(actualNewCommitIndex); } catch ( IOException e ) { @@ -289,7 +282,6 @@ public long appendIndex() @Override public long commitIndex() { - commitIndexMonitor.commitIndex( commitIndex ); return commitIndex; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpRaftLog.java index 37f7b8b648816..c08c80f87d419 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpRaftLog.java @@ -36,7 +36,7 @@ public static void main( String[] args ) throws RaftStorageException File logDirectory = new File( arg ); System.out.println( "logDirectory = " + logDirectory ); NaiveDurableRaftLog log = new NaiveDurableRaftLog( new DefaultFileSystemAbstraction(), - logDirectory, new RaftContentSerializer(), new Monitors() ); + logDirectory, new RaftContentSerializer() ); new LogPrinter( log ).print( System.out ); System.out.println(); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/term/MonitoredTermState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/term/MonitoredTermState.java new file mode 100644 index 0000000000000..11b7146ad1083 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/term/MonitoredTermState.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.state.term; + +import org.neo4j.coreedge.raft.log.RaftStorageException; +import org.neo4j.coreedge.raft.log.monitoring.RaftTermMonitor; +import org.neo4j.kernel.monitoring.Monitors; + +public class MonitoredTermState implements TermState +{ + private final TermState delegate; + private final RaftTermMonitor termMonitor; + + public MonitoredTermState( TermState delegate, Monitors monitors ) + { + this.delegate = delegate; + + this.termMonitor = monitors.newMonitor( RaftTermMonitor.class, getClass(), TERM_TAG ); + + } + + @Override + public long currentTerm() + { + return delegate.currentTerm(); + } + + @Override + public void update( long newTerm ) throws RaftStorageException + { + delegate.update( newTerm ); + termMonitor.term( newTerm ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index baf1f8ecc8d7c..46c0b7edf9658 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -37,6 +37,7 @@ import org.neo4j.coreedge.raft.LeaderLocator; import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.RaftServer; +import org.neo4j.coreedge.raft.log.MonitoredRaftLog; import org.neo4j.coreedge.raft.log.NaiveDurableRaftLog; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.membership.CoreMemberSetBuilder; @@ -69,6 +70,7 @@ import org.neo4j.coreedge.raft.state.id_allocation.OnDiskIdAllocationState; import org.neo4j.coreedge.raft.state.membership.OnDiskRaftMembershipState; import org.neo4j.coreedge.raft.state.membership.RaftMembershipState; +import org.neo4j.coreedge.raft.state.term.MonitoredTermState; import org.neo4j.coreedge.raft.state.term.OnDiskTermState; import org.neo4j.coreedge.raft.state.term.TermState; import org.neo4j.coreedge.raft.state.vote.OnDiskVoteState; @@ -182,14 +184,17 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, NaiveDurableRaftLog raftLog = life.add( new NaiveDurableRaftLog( fileSystem, new File( clusterStateDirectory, NaiveDurableRaftLog.DIRECTORY_NAME ), - new RaftContentSerializer(), platformModule.monitors ) ); + new RaftContentSerializer() ) ); + + MonitoredRaftLog monitoredRaftLog = new MonitoredRaftLog( raftLog, platformModule.monitors) ; TermState termState; try { - termState = life.add( new OnDiskTermState( fileSystem, + OnDiskTermState onDiskTermState = life.add( new OnDiskTermState( fileSystem, new File( clusterStateDirectory, OnDiskTermState.DIRECTORY_NAME ), config.get( CoreEdgeClusterSettings.term_state_size ), databaseHealthSupplier ) ); + termState = new MonitoredTermState( onDiskTermState, platformModule.monitors ); } catch ( IOException e ) { @@ -223,7 +228,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, throw new RuntimeException( e ); } - raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, raftLog, + raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, monitoredRaftLog, termState, voteState, myself, logProvider, raftServer, raftTimeoutService, databaseHealthSupplier, raftMembershipState ); @@ -261,7 +266,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, ReplicatedIdRangeAcquirer idRangeAcquirer = new ReplicatedIdRangeAcquirer( replicator, idAllocationStateMachine, 1024, 1000, myself, logProvider ); - raftLog.registerListener( replicator ); + monitoredRaftLog.registerListener( replicator ); long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); MembershipWaiter membershipWaiter = @@ -320,7 +325,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, config.get( CoreEdgeClusterSettings.transaction_listen_address ) ); life.add( CoreServerStartupProcess.createLifeSupport( - platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft, new RaftLogReplay( raftLog, + platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft, new RaftLogReplay( monitoredRaftLog, logProvider ), raftServer, catchupServer, raftTimeoutService, membershipWaiter, config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java index 1bc54dbb6dfdf..d6036fd538b07 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java @@ -100,18 +100,21 @@ private static File coreServerStoreDirectory( File parentDir, int serverId ) return new File( parentDir, "server-core-" + serverId ); } - public static File edgeSeverStoreDirectory( File parentDir, int serverId ) + public static File edgeServerStoreDirectory( File parentDir, int serverId ) { return new File( parentDir, "server-edge-" + serverId ); } - private static Map serverParams( String serverType, int serverId, String initialHosts ) + private Map serverParams( String serverType, int serverId, String initialHosts ) { Map params = stringMap(); params.put( "org.neo4j.server.database.mode", serverType ); params.put( ClusterSettings.cluster_name.name(), CLUSTER_NAME ); params.put( ClusterSettings.server_id.name(), String.valueOf( serverId ) ); params.put( CoreEdgeClusterSettings.initial_core_cluster_members.name(), initialHosts ); + params.put( "metrics.csv.enabled", "true" ); + params.put( "metrics.neo4j.network.enabled", "true" ); + params.put( "metrics.csv.path", "metrics" ); return params; } @@ -185,7 +188,7 @@ public CoreGraphDatabase startCoreServer( int serverId, int clusterSize, List addresses ) { - final File storeDir = edgeSeverStoreDirectory( parentDir, serverId ); + final File storeDir = edgeServerStoreDirectory( parentDir, serverId ); return startEdgeServer( serverId, storeDir, addresses ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/MonitoredRaftLogTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/MonitoredRaftLogTest.java new file mode 100644 index 0000000000000..dc92dabd78a34 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/MonitoredRaftLogTest.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import java.io.File; + +import org.junit.Test; + +import org.neo4j.coreedge.raft.ReplicatedInteger; +import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor; +import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; +import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.monitoring.Monitors; + +import static org.junit.Assert.*; + +public class MonitoredRaftLogTest +{ + @Test + public void shouldMonitorAppendIndexAndCommitIndex() throws Exception + { + // Given + FileSystemAbstraction fsa = new EphemeralFileSystemAbstraction(); + + File directory = new File( "." ); + fsa.create( directory ); + + Monitors monitors = new Monitors(); + StubRaftLogAppendIndexMonitor appendMonitor = new StubRaftLogAppendIndexMonitor(); + monitors.addMonitorListener( appendMonitor ); + + StubRaftLogCommitIndexMonitor commitMonitor = new StubRaftLogCommitIndexMonitor(); + monitors.addMonitorListener( commitMonitor ); + + MonitoredRaftLog log = new MonitoredRaftLog( + new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer() ), monitors ); + + // When + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + log.commit( 0 ); + + assertEquals( 1, appendMonitor.appendIndex() ); + assertEquals( 0, commitMonitor.commitIndex() ); + + log.truncate( 1 ); + assertEquals( 0, appendMonitor.appendIndex() ); + } + + private static class StubRaftLogCommitIndexMonitor implements RaftLogCommitIndexMonitor + { + private long commitIndex; + + @Override + public long commitIndex() + { + return commitIndex; + } + + @Override + public void commitIndex( long commitIndex ) + { + this.commitIndex = commitIndex; + } + } + + private static class StubRaftLogAppendIndexMonitor implements RaftLogAppendIndexMonitor + { + private long appendIndex; + + @Override + public long appendIndex() + { + return appendIndex; + } + + @Override + public void appendIndex( long appendIndex ) + { + this.appendIndex = appendIndex; + } + } +} \ No newline at end of file diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogContractTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogContractTest.java index 69236be83a68d..3b86849c45cc5 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogContractTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogContractTest.java @@ -35,6 +35,6 @@ public RaftLog createRaftLog() throws IOException File directory = new File( "raft-log" ); fileSystem.mkdir( directory ); - return new NaiveDurableRaftLog( fileSystem, directory, new DummyRaftableContentSerializer(), new Monitors() ); + return new NaiveDurableRaftLog( fileSystem, directory, new DummyRaftableContentSerializer() ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java index d3c7899e06757..8a4b5355d61d1 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java @@ -38,10 +38,16 @@ import org.junit.Test; import org.mockito.Matchers; import org.neo4j.coreedge.raft.ReplicatedInteger; +import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor; +import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; +import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.StoreFileChannel; import org.neo4j.kernel.monitoring.Monitors; +import static org.neo4j.coreedge.raft.log.RaftLog.APPEND_INDEX_TAG; +import static org.neo4j.coreedge.raft.log.RaftLog.COMMIT_INDEX_TAG; + public class NaiveDurableRaftLogTest { @Test @@ -63,8 +69,7 @@ public void shouldCallWriteAllWhenStoringEntries() throws Exception when( fsa.open( Matchers.eq( contentFile ), anyString() ) ).thenReturn( contentChannel ); when( fsa.open( Matchers.eq( commitFile ), anyString() ) ).thenReturn( commitChannel ); - NaiveDurableRaftLog log = new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer(), new - Monitors() ); + NaiveDurableRaftLog log = new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer()); // When log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); @@ -102,8 +107,7 @@ public void shouldForceAndCloseFilesOnShutdown() throws Throwable when( fsa.open( Matchers.eq( contentFile ), anyString() ) ).thenReturn( contentChannel ); when( fsa.open( Matchers.eq( commitFile ), anyString() ) ).thenReturn( commitChannel ); - NaiveDurableRaftLog log = new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer(), new - Monitors() ); + NaiveDurableRaftLog log = new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer()); // When log.shutdown(); @@ -144,8 +148,7 @@ public void shouldForceAndCloseFilesOnShutdownEvenOnFailure() throws Throwable when( fsa.open( Matchers.eq( contentFile ), anyString() ) ).thenReturn( contentChannel ); when( fsa.open( Matchers.eq( commitFile ), anyString() ) ).thenReturn( commitChannel ); - NaiveDurableRaftLog log = new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer(), new - Monitors() ); + NaiveDurableRaftLog log = new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer()); // When try diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java index 4f3e951138c55..0360a47e4cd3e 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java @@ -43,7 +43,7 @@ public RaftLog createRaftLog( FileSystemAbstraction fileSystem ) { File directory = new File( "raft-log" ); fileSystem.mkdir( directory ); - return new NaiveDurableRaftLog( fileSystem, directory, new DummyRaftableContentSerializer(), new Monitors() ); + return new NaiveDurableRaftLog( fileSystem, directory, new DummyRaftableContentSerializer()); } @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java index 826d50cab2914..e264329c87796 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java @@ -38,7 +38,7 @@ public RaftLog createRaftLog( EphemeralFileSystemAbstraction fileSystem ) { File directory = new File( "raft-log" ); fileSystem.mkdir( directory ); - return new NaiveDurableRaftLog( fileSystem, directory, new DummyRaftableContentSerializer(), new Monitors() ); + return new NaiveDurableRaftLog( fileSystem, directory, new DummyRaftableContentSerializer() ); } @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/debug/ReplayRaftLog.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/debug/ReplayRaftLog.java index 765e791f84129..577dca230a417 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/debug/ReplayRaftLog.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/debug/ReplayRaftLog.java @@ -49,7 +49,7 @@ public static void main( String[] args ) throws RaftStorageException, IOExceptio File logDirectory = new File( from ); System.out.println( "logDirectory = " + logDirectory ); NaiveDurableRaftLog log = new NaiveDurableRaftLog( new DefaultFileSystemAbstraction(), - logDirectory, new RaftContentSerializer(), new Monitors() ); + logDirectory, new RaftContentSerializer() ); long totalCommittedEntries = log.commitIndex(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/term/MonitoredTermStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/term/MonitoredTermStateTest.java new file mode 100644 index 0000000000000..82bc5b4f7bc49 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/term/MonitoredTermStateTest.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.state.term; + +import org.junit.Test; + +import org.neo4j.coreedge.raft.log.RaftStorageException; +import org.neo4j.coreedge.raft.log.monitoring.RaftTermMonitor; +import org.neo4j.kernel.monitoring.Monitors; + +import static org.junit.Assert.assertEquals; + +public class MonitoredTermStateTest +{ + @Test + public void shouldMonitorTerm() throws Exception + { + // given + + Monitors monitors = new Monitors(); + StubRaftTermMonitor raftTermMonitor = new StubRaftTermMonitor(); + monitors.addMonitorListener( raftTermMonitor ); + + // when + new MonitoredTermState( new StubTermState(), monitors ).update( 7 ); + + // then + assertEquals(7, raftTermMonitor.term()); + } + + + private static class StubTermState implements TermState + { + private long currentTerm; + + @Override + public long currentTerm() + { + return currentTerm; + } + + @Override + public void update( long newTerm ) throws RaftStorageException + { + currentTerm = newTerm; + } + } + + private static class StubRaftTermMonitor implements RaftTermMonitor + { + private long term; + + @Override + public long term() + { + return term; + } + + @Override + public void term( long term ) + { + this.term = term; + } + } +} \ No newline at end of file diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java index b63425a15fadf..d409e5e2a0b51 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java @@ -187,7 +187,7 @@ private File createExistingEdgeStore( String path ) dir.mkdirs(); GraphDatabaseService db = new TestGraphDatabaseFactory() - .newEmbeddedDatabase( Cluster.edgeSeverStoreDirectory( dir, 1966 ) ); + .newEmbeddedDatabase( Cluster.edgeServerStoreDirectory( dir, 1966 ) ); try ( Transaction tx = db.beginTx() ) { diff --git a/enterprise/metrics/pom.xml b/enterprise/metrics/pom.xml index c69b934b49974..d64b283e59ff0 100644 --- a/enterprise/metrics/pom.xml +++ b/enterprise/metrics/pom.xml @@ -155,6 +155,13 @@ test test-jar + + org.neo4j + neo4j-core-edge + ${project.version} + test + test-jar + io.dropwizard.metrics diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java new file mode 100644 index 0000000000000..8d696c8d1c0c2 --- /dev/null +++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.metrics; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; + +import org.junit.Rule; +import org.junit.Test; + +import org.neo4j.coreedge.server.core.CoreGraphDatabase; +import org.neo4j.function.ThrowingSupplier; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Transaction; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.metrics.source.CoreEdgeMetrics; +import org.neo4j.test.TargetDirectory; +import org.neo4j.tooling.GlobalGraphOperations; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_advertised_address; +import static org.neo4j.graphdb.Label.label; +import static org.neo4j.helpers.collection.Iterables.count; +import static org.neo4j.test.Assert.assertEventually; + + +import org.neo4j.coreedge.discovery.Cluster; + +public class CoreEdgeMetricsIT +{ + @Rule + public final TargetDirectory.TestDirectory dir = TargetDirectory.testDirForTest( getClass() ); + + @Test + public void shouldReplicateTransactionToCoreServers() throws Exception + { + // given + File dbDir = dir.directory(); + Cluster cluster = Cluster.start( dbDir, 3, 0 ); + + // when + GraphDatabaseService coreDB = cluster.findLeader( 5000 ); + + try ( Transaction tx = coreDB.beginTx() ) + { + Node node = coreDB.createNode( label( "boo" ) ); + node.setProperty( "foobar", "baz_bat" ); + tx.success(); + } + + // then + for ( final CoreGraphDatabase db : cluster.coreServers() ) + { + try ( Transaction tx = db.beginTx() ) + { + ThrowingSupplier nodeCount = () -> count( db.getAllNodes() ); + + Config config = db.getDependencyResolver().resolveDependency( Config.class ); + + assertEventually( "node to appear on core server " + config.get( raft_advertised_address ), nodeCount, + greaterThan( 0L ), 15, SECONDS ); + + for ( Node node : GlobalGraphOperations.at( db ).getAllNodes() ) + { + assertEquals( "baz_bat", node.getProperty( "foobar" ) ); + } + + tx.success(); + } + } + + File appendMetrics = metricsCsv( dbDir, CoreEdgeMetrics.APPEND_INDEX ); + assertThat( readLastValue( appendMetrics ), greaterThan( 0L ) ); + + File commitMetrics = metricsCsv( dbDir, CoreEdgeMetrics.COMMIT_INDEX ); + assertThat( readLastValue( commitMetrics ), greaterThan( 0L ) ); + + File termMetrics = metricsCsv( dbDir, CoreEdgeMetrics.TERM ); + assertThat( readLastValue( termMetrics ), greaterThan( 0L ) ); + + cluster.shutdown(); + } + + private File metricsCsv( File dbDir, String metric ) + { + File csvFile = new File( dbDir, "/server-core-0/metrics/" + metric + ".csv" ); + assertEventually( "Metrics file should exist", csvFile::exists, is( true ), 20, SECONDS ); + return csvFile; + } + + private static final int TIME_STAMP = 0; + private static final int METRICS_VALUE = 1; + + + private long readLastValue( File metricFile ) throws IOException + { + try ( BufferedReader reader = new BufferedReader( new FileReader( metricFile ) ) ) + { + String[] headers = reader.readLine().split( "," ); + assertThat( headers.length, is( 2 ) ); + assertThat( headers[TIME_STAMP], is( "t" ) ); + assertThat( headers[METRICS_VALUE], is( "value" ) ); + + String line = reader.readLine(); + String[] fields = line.split( "," ); + return Long.valueOf( fields[METRICS_VALUE] ); + } + } + +} diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/MetricsKernelExtensionFactoryIT.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/MetricsKernelExtensionFactoryIT.java index 871f87ada9892..4c348cd444ace 100644 --- a/enterprise/metrics/src/test/java/org/neo4j/metrics/MetricsKernelExtensionFactoryIT.java +++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/MetricsKernelExtensionFactoryIT.java @@ -173,47 +173,6 @@ public void shouldUseEventBasedReportingCorrectly() throws Throwable readLongValueAndAssert( metricFile, ( newValue, currentValue ) -> newValue > 0 ); } - @Test - public void shouldLogCoreEdgeMetrics() throws Throwable - { - createCluster( "10m", "100ms" ); - - Monitors monitors = db.getDependencyResolver().resolveDependency( Monitors.class ); - - NaiveDurableRaftLog log = new NaiveDurableRaftLog( new DefaultFileSystemAbstraction(), db.getStoreDirectory(), - new RaftContentSerializer(), monitors ); - - CoreMember owner = new CoreMember( new AdvertisedSocketAddress( "localhost:7001" ), new - AdvertisedSocketAddress( "localhost:6001" ) ); - log.append( new RaftLogEntry( 8, allocation( owner, 0 ) ) ); - log.append( new RaftLogEntry( 8, allocation( owner, 1024 ) ) ); - log.append( new RaftLogEntry( 8, allocation( owner, 2048 ) ) ); - log.commit( 2 ); - - - File appenIndexMetricsFile = new File( outputPath, CoreEdgeMetrics.APPEND_INDEX + ".csv" ); - while ( !appenIndexMetricsFile.exists() ) - { - Thread.sleep( 10 ); - } - - File commitIndexMetricsFile = new File( outputPath, CoreEdgeMetrics.COMMIT_INDEX + ".csv" ); - while ( !commitIndexMetricsFile.exists() ) - { - Thread.sleep( 10 ); - } - - cluster.stop(); - - readLongValueAndAssert( appenIndexMetricsFile, ( newValue, currentValue ) -> newValue > 0 ); - readLongValueAndAssert( commitIndexMetricsFile, ( newValue, currentValue ) -> newValue > 0 ); - } - - private ReplicatedIdAllocationRequest allocation( CoreMember owner, int start ) - { - return new ReplicatedIdAllocationRequest( owner, RELATIONSHIP_TYPE_TOKEN, start, 1024 ); - } - private void addNodes( int numberOfNodes ) { for ( int i = 0; i < numberOfNodes; i++ )