Skip to content

Commit

Permalink
Rewrite the core edge monitoring code as decorators
Browse files Browse the repository at this point in the history
* Added integration test to check CSV files correctly populated
* Added unit tests for monitoring wrapper
* Removed initial integration test from MetricsKernelExtensionFactoryIT
  as it didn't fit in there.
  • Loading branch information
Mark Needham committed Jan 16, 2016
1 parent cadf7e9 commit a095933
Show file tree
Hide file tree
Showing 17 changed files with 521 additions and 73 deletions.
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.log;

import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor;
import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.kernel.monitoring.Monitors;

public class MonitoredRaftLog implements RaftLog
{
private final RaftLog delegate;
private final RaftLogAppendIndexMonitor appendIndexMonitor;
private final RaftLogCommitIndexMonitor commitIndexMonitor;

public MonitoredRaftLog( RaftLog delegate, Monitors monitors )
{
this.delegate = delegate;
this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass(), APPEND_INDEX_TAG );
this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass(), COMMIT_INDEX_TAG );
}

@Override
public long append( RaftLogEntry entry ) throws RaftStorageException
{
long appendIndex = delegate.append( entry );
appendIndexMonitor.appendIndex( appendIndex );
return appendIndex;
}

@Override
public void truncate( long fromIndex ) throws RaftStorageException
{
delegate.truncate( fromIndex );
appendIndexMonitor.appendIndex( delegate.appendIndex() );
}

@Override
public void commit( long commitIndex ) throws RaftStorageException
{
delegate.commit( commitIndex );
commitIndexMonitor.commitIndex( delegate.commitIndex() );
}

@Override
public void replay() throws Throwable
{
delegate.replay();
}

@Override
public void registerListener( Listener consumer )
{
delegate.registerListener( consumer );
}

@Override
public long appendIndex()
{
return delegate.appendIndex();
}

@Override
public long commitIndex()
{
return delegate.commitIndex();
}

@Override
public RaftLogEntry readLogEntry( long logIndex ) throws RaftStorageException
{
return delegate.readLogEntry( logIndex );
}

@Override
public ReplicatedContent readEntryContent( long logIndex ) throws RaftStorageException
{
return delegate.readEntryContent( logIndex );
}

@Override
public long readEntryTerm( long logIndex ) throws RaftStorageException
{
return delegate.readEntryTerm( logIndex );
}

@Override
public boolean entryExists( long logIndex )
{
return delegate.entryExists( logIndex );
}
}
Expand Up @@ -35,6 +35,9 @@
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;


import static org.neo4j.coreedge.raft.log.RaftLog.*;


/** /**
* Writes a raft log to disk using 3 files: * Writes a raft log to disk using 3 files:
* <p> * <p>
Expand Down Expand Up @@ -82,17 +85,9 @@ public class NaiveDurableRaftLog extends LifecycleAdapter implements RaftLog
private long commitIndex = -1; private long commitIndex = -1;
private long term = -1; private long term = -1;


private final RaftLogAppendIndexMonitor appendIndexMonitor; public NaiveDurableRaftLog( FileSystemAbstraction fileSystem, File directory, Serializer serializer)
private final RaftLogCommitIndexMonitor commitIndexMonitor;

public NaiveDurableRaftLog( FileSystemAbstraction fileSystem, File directory, Serializer serializer,
Monitors monitors )
{ {
this.serializer = serializer; this.serializer = serializer;
this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass(), RaftLog
.APPEND_INDEX_TAG );
this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass(), RaftLog
.COMMIT_INDEX_TAG );


directory.mkdirs(); directory.mkdirs();


Expand Down Expand Up @@ -198,7 +193,6 @@ public long append( RaftLogEntry logEntry ) throws RaftStorageException
writeEntry( new Entry( logEntry.term(), contentOffset ) ); writeEntry( new Entry( logEntry.term(), contentOffset ) );
contentOffset += length; contentOffset += length;
appendIndex++; appendIndex++;
appendIndexMonitor.appendIndex(appendIndex);
for ( Listener listener : listeners ) for ( Listener listener : listeners )
{ {
listener.onAppended( logEntry.content(), appendIndex ); listener.onAppended( logEntry.content(), appendIndex );
Expand Down Expand Up @@ -262,7 +256,6 @@ public void commit( final long newCommitIndex ) throws RaftStorageException
try try
{ {
storeCommitIndex( actualNewCommitIndex ); storeCommitIndex( actualNewCommitIndex );
commitIndexMonitor.commitIndex(actualNewCommitIndex);
} }
catch ( IOException e ) catch ( IOException e )
{ {
Expand All @@ -289,7 +282,6 @@ public long appendIndex()
@Override @Override
public long commitIndex() public long commitIndex()
{ {
commitIndexMonitor.commitIndex( commitIndex );
return commitIndex; return commitIndex;
} }


Expand Down
Expand Up @@ -36,7 +36,7 @@ public static void main( String[] args ) throws RaftStorageException
File logDirectory = new File( arg ); File logDirectory = new File( arg );
System.out.println( "logDirectory = " + logDirectory ); System.out.println( "logDirectory = " + logDirectory );
NaiveDurableRaftLog log = new NaiveDurableRaftLog( new DefaultFileSystemAbstraction(), NaiveDurableRaftLog log = new NaiveDurableRaftLog( new DefaultFileSystemAbstraction(),
logDirectory, new RaftContentSerializer(), new Monitors() ); logDirectory, new RaftContentSerializer() );


new LogPrinter( log ).print( System.out ); new LogPrinter( log ).print( System.out );
System.out.println(); System.out.println();
Expand Down
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.state.term;

import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.coreedge.raft.log.monitoring.RaftTermMonitor;
import org.neo4j.kernel.monitoring.Monitors;

public class MonitoredTermState implements TermState
{
private final TermState delegate;
private final RaftTermMonitor termMonitor;

public MonitoredTermState( TermState delegate, Monitors monitors )
{
this.delegate = delegate;

this.termMonitor = monitors.newMonitor( RaftTermMonitor.class, getClass(), TERM_TAG );

}

@Override
public long currentTerm()
{
return delegate.currentTerm();
}

@Override
public void update( long newTerm ) throws RaftStorageException
{
delegate.update( newTerm );
termMonitor.term( newTerm );
}
}
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.coreedge.raft.LeaderLocator; import org.neo4j.coreedge.raft.LeaderLocator;
import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.RaftInstance;
import org.neo4j.coreedge.raft.RaftServer; import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.log.MonitoredRaftLog;
import org.neo4j.coreedge.raft.log.NaiveDurableRaftLog; import org.neo4j.coreedge.raft.log.NaiveDurableRaftLog;
import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.membership.CoreMemberSetBuilder; import org.neo4j.coreedge.raft.membership.CoreMemberSetBuilder;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.neo4j.coreedge.raft.state.id_allocation.OnDiskIdAllocationState; import org.neo4j.coreedge.raft.state.id_allocation.OnDiskIdAllocationState;
import org.neo4j.coreedge.raft.state.membership.OnDiskRaftMembershipState; import org.neo4j.coreedge.raft.state.membership.OnDiskRaftMembershipState;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState; import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.coreedge.raft.state.term.MonitoredTermState;
import org.neo4j.coreedge.raft.state.term.OnDiskTermState; import org.neo4j.coreedge.raft.state.term.OnDiskTermState;
import org.neo4j.coreedge.raft.state.term.TermState; import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.OnDiskVoteState; import org.neo4j.coreedge.raft.state.vote.OnDiskVoteState;
Expand Down Expand Up @@ -182,14 +184,17 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,


NaiveDurableRaftLog raftLog = life.add( new NaiveDurableRaftLog( fileSystem, NaiveDurableRaftLog raftLog = life.add( new NaiveDurableRaftLog( fileSystem,
new File( clusterStateDirectory, NaiveDurableRaftLog.DIRECTORY_NAME ), new File( clusterStateDirectory, NaiveDurableRaftLog.DIRECTORY_NAME ),
new RaftContentSerializer(), platformModule.monitors ) ); new RaftContentSerializer() ) );

MonitoredRaftLog monitoredRaftLog = new MonitoredRaftLog( raftLog, platformModule.monitors) ;


TermState termState; TermState termState;
try try
{ {
termState = life.add( new OnDiskTermState( fileSystem, OnDiskTermState onDiskTermState = life.add( new OnDiskTermState( fileSystem,
new File( clusterStateDirectory, OnDiskTermState.DIRECTORY_NAME ), new File( clusterStateDirectory, OnDiskTermState.DIRECTORY_NAME ),
config.get( CoreEdgeClusterSettings.term_state_size ), databaseHealthSupplier ) ); config.get( CoreEdgeClusterSettings.term_state_size ), databaseHealthSupplier ) );
termState = new MonitoredTermState( onDiskTermState, platformModule.monitors );
} }
catch ( IOException e ) catch ( IOException e )
{ {
Expand Down Expand Up @@ -223,7 +228,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
throw new RuntimeException( e ); throw new RuntimeException( e );
} }


raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, raftLog, raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, monitoredRaftLog,
termState, voteState, myself, logProvider, raftServer, raftTimeoutService, termState, voteState, myself, logProvider, raftServer, raftTimeoutService,
databaseHealthSupplier, raftMembershipState ); databaseHealthSupplier, raftMembershipState );


Expand Down Expand Up @@ -261,7 +266,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
ReplicatedIdRangeAcquirer idRangeAcquirer = new ReplicatedIdRangeAcquirer( replicator, ReplicatedIdRangeAcquirer idRangeAcquirer = new ReplicatedIdRangeAcquirer( replicator,
idAllocationStateMachine, 1024, 1000, myself, logProvider ); idAllocationStateMachine, 1024, 1000, myself, logProvider );


raftLog.registerListener( replicator ); monitoredRaftLog.registerListener( replicator );


long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout );
MembershipWaiter<CoreMember> membershipWaiter = MembershipWaiter<CoreMember> membershipWaiter =
Expand Down Expand Up @@ -320,7 +325,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
config.get( CoreEdgeClusterSettings.transaction_listen_address ) ); config.get( CoreEdgeClusterSettings.transaction_listen_address ) );


life.add( CoreServerStartupProcess.createLifeSupport( life.add( CoreServerStartupProcess.createLifeSupport(
platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft, new RaftLogReplay( raftLog, platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft, new RaftLogReplay( monitoredRaftLog,
logProvider ), raftServer, logProvider ), raftServer,
catchupServer, raftTimeoutService, membershipWaiter, catchupServer, raftTimeoutService, membershipWaiter,
config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), config.get( CoreEdgeClusterSettings.join_catch_up_timeout ),
Expand Down
Expand Up @@ -100,18 +100,21 @@ private static File coreServerStoreDirectory( File parentDir, int serverId )
return new File( parentDir, "server-core-" + serverId ); return new File( parentDir, "server-core-" + serverId );
} }


public static File edgeSeverStoreDirectory( File parentDir, int serverId ) public static File edgeServerStoreDirectory( File parentDir, int serverId )
{ {
return new File( parentDir, "server-edge-" + serverId ); return new File( parentDir, "server-edge-" + serverId );
} }


private static Map<String, String> serverParams( String serverType, int serverId, String initialHosts ) private Map<String, String> serverParams( String serverType, int serverId, String initialHosts )
{ {
Map<String, String> params = stringMap(); Map<String, String> params = stringMap();
params.put( "org.neo4j.server.database.mode", serverType ); params.put( "org.neo4j.server.database.mode", serverType );
params.put( ClusterSettings.cluster_name.name(), CLUSTER_NAME ); params.put( ClusterSettings.cluster_name.name(), CLUSTER_NAME );
params.put( ClusterSettings.server_id.name(), String.valueOf( serverId ) ); params.put( ClusterSettings.server_id.name(), String.valueOf( serverId ) );
params.put( CoreEdgeClusterSettings.initial_core_cluster_members.name(), initialHosts ); params.put( CoreEdgeClusterSettings.initial_core_cluster_members.name(), initialHosts );
params.put( "metrics.csv.enabled", "true" );
params.put( "metrics.neo4j.network.enabled", "true" );
params.put( "metrics.csv.path", "metrics" );
return params; return params;
} }


Expand Down Expand Up @@ -185,7 +188,7 @@ public CoreGraphDatabase startCoreServer( int serverId, int clusterSize, List<Ad


public EdgeGraphDatabase startEdgeServer( int serverId, List<AdvertisedSocketAddress> addresses ) public EdgeGraphDatabase startEdgeServer( int serverId, List<AdvertisedSocketAddress> addresses )
{ {
final File storeDir = edgeSeverStoreDirectory( parentDir, serverId ); final File storeDir = edgeServerStoreDirectory( parentDir, serverId );
return startEdgeServer( serverId, storeDir, addresses ); return startEdgeServer( serverId, storeDir, addresses );
} }


Expand Down

0 comments on commit a095933

Please sign in to comment.