Skip to content

Commit

Permalink
Added Master/Available cluster metrics
Browse files Browse the repository at this point in the history
Implemented metrics reporting the IS_MASTER and IS_AVAILABLE status of
a cluster member.

Changed cluster metrics to use a dependency resolver to allow for
community builds to build the metrics package without causing problems
since these metrics actually need enterprise/HA features.

Added additional test for when cluster metrics are not available but
reporters attempt to get those values. Previously a
NullPointerException was thrown. Now those metrics just return a
negative response.
  • Loading branch information
Max Sumrall authored and davidegrohmann committed Nov 13, 2015
1 parent 7b10131 commit 1adb015
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 35 deletions.
7 changes: 6 additions & 1 deletion enterprise/metrics/pom.xml
Expand Up @@ -100,7 +100,7 @@

<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-kernel</artifactId>
<artifactId>neo4j-ha</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
Expand Down Expand Up @@ -137,5 +137,10 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Expand Up @@ -22,9 +22,11 @@
import com.codahale.metrics.MetricRegistry;

import org.neo4j.cluster.ClusterSettings;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.io.pagecache.monitoring.PageCacheMonitor;
import org.neo4j.kernel.IdGeneratorFactory;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.cluster.member.ClusterMembers;
import org.neo4j.kernel.impl.api.LogRotationMonitor;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.TransactionCounters;
Expand All @@ -49,7 +51,7 @@ public class MetricsExtension implements Lifecycle
private final CheckPointerMonitor checkPointerMonitor;
private final IdGeneratorFactory idGeneratorFactory;
private final LogRotationMonitor logRotationMonitor;

private final DependencyResolver dependencyResolver;

public MetricsExtension( MetricsKernelExtensionFactory.Dependencies dependencies )
{
Expand All @@ -62,6 +64,7 @@ public MetricsExtension( MetricsKernelExtensionFactory.Dependencies dependencies
checkPointerMonitor = dependencies.checkPointerCounters();
logRotationMonitor = dependencies.logRotationCounters();
idGeneratorFactory = dependencies.idGeneratorFactory();
dependencyResolver = dependencies.getDependencyResolver();
}

@Override
Expand All @@ -83,7 +86,8 @@ public void init() throws Throwable

// Setup metric gathering
Neo4jMetricsFactory factory = new Neo4jMetricsFactory( registry, configuration, monitors,
transactionCounters, pageCacheCounters, checkPointerMonitor, logRotationMonitor, idGeneratorFactory );
transactionCounters, pageCacheCounters, checkPointerMonitor, logRotationMonitor, idGeneratorFactory,
dependencyResolver, logService);
life.add( factory.newInstance() );

life.init();
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.metrics;

import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.io.pagecache.monitoring.PageCacheMonitor;
import org.neo4j.kernel.IdGeneratorFactory;
import org.neo4j.kernel.configuration.Config;
Expand Down Expand Up @@ -51,6 +52,8 @@ public interface Dependencies
IdGeneratorFactory idGeneratorFactory();

Monitors monitors();

DependencyResolver getDependencyResolver();
}

public MetricsKernelExtensionFactory()
Expand Down
Expand Up @@ -25,39 +25,57 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.function.Predicate;
import org.neo4j.function.Predicates;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.SlaveUpdatePuller;
import org.neo4j.kernel.ha.cluster.member.ClusterMembers;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.metrics.MetricsSettings;

import static com.codahale.metrics.MetricRegistry.name;
import static org.neo4j.kernel.ha.cluster.modeswitch.HighAvailabilityModeSwitcher.MASTER;
import static org.neo4j.kernel.ha.cluster.modeswitch.HighAvailabilityModeSwitcher.UNKNOWN;

public class ClusterMetrics extends LifecycleAdapter
{
private static final String NAME_PREFIX = "neo4j.cluster";
private static final String SLAVE_PULL_UPDATES = name( NAME_PREFIX, "slave_pull_updates" );
private static final String SLAVE_PULL_UPDATE_UP_TO_TX = name( NAME_PREFIX, "slave_pull_update_up_to_tx" );
static final String IS_MASTER = name( NAME_PREFIX, "is_master" );
static final String IS_AVAILABLE = name( NAME_PREFIX, "is_available" );

private final Config config;
private final Monitors monitors;
private final MetricRegistry registry;
private final DependencyResolver dependencyResolver;
private final LogService logService;
private final SlaveUpdatePullerMonitor monitor = new SlaveUpdatePullerMonitor();
private ClusterMembers clusterMembers = null;

public ClusterMetrics( Config config, Monitors monitors, MetricRegistry registry )
public ClusterMetrics( Config config, Monitors monitors, MetricRegistry registry,
DependencyResolver dependencyResolver, LogService logService )
{
this.config = config;
this.monitors = monitors;
this.registry = registry;
this.dependencyResolver = dependencyResolver;
this.logService = logService;
}

@Override
public void start() throws Throwable
{
if ( config.get( MetricsSettings.neoClusterEnabled ) )
if ( config.get( MetricsSettings.neoClusterEnabled ) && resolveClusterMembersDependencyOrLogWarning() )
{
monitors.addMonitorListener( monitor );

registry.register( IS_MASTER, new RoleGauge( Predicates.equalTo( MASTER ) ) );
registry.register( IS_AVAILABLE, new RoleGauge( Predicates.not( Predicates.equalTo( UNKNOWN ) ) ) );

registry.register( SLAVE_PULL_UPDATES, new Gauge<Long>()
{
@Override
Expand All @@ -75,17 +93,36 @@ public Long getValue()
return monitor.lastAppliedTxId;
}
} );

}
}

private boolean resolveClusterMembersDependencyOrLogWarning()
{
try
{
clusterMembers = dependencyResolver.resolveDependency( ClusterMembers.class );
return true;
}
catch ( IllegalArgumentException e )
{
logService.getUserLog( getClass() ).warn( "Cluster metrics was enabled but the graph database" +
"is not in HA mode." );
return false;
}
}

@Override
public void stop() throws IOException
{
if ( config.get( MetricsSettings.neoClusterEnabled ) )
if ( config.get( MetricsSettings.neoClusterEnabled ) && (clusterMembers != null) )
{
registry.remove( SLAVE_PULL_UPDATES );
registry.remove( SLAVE_PULL_UPDATE_UP_TO_TX );

registry.remove( IS_MASTER );
registry.remove( IS_AVAILABLE );

monitors.removeMonitorListener( monitor );
}
}
Expand All @@ -102,4 +139,24 @@ public void pulledUpdates( long lastAppliedTxId )
this.lastAppliedTxId = lastAppliedTxId;
}
}

private class RoleGauge implements Gauge<Integer>
{
private Predicate<String> rolePredicate;

public RoleGauge( Predicate<String> rolePredicate )
{
this.rolePredicate = rolePredicate;
}

public Integer getValue()
{
int value = 0;
if ( clusterMembers != null )
{
value = rolePredicate.test( clusterMembers.getCurrentMemberRole() ) ? 1 : 0;
}
return value;
}
}
}
Expand Up @@ -24,10 +24,12 @@
import java.io.IOException;

import org.neo4j.function.Factory;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.io.pagecache.monitoring.PageCacheMonitor;
import org.neo4j.kernel.IdGeneratorFactory;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.LogRotationMonitor;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.TransactionCounters;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointerMonitor;
import org.neo4j.kernel.lifecycle.Lifecycle;
Expand All @@ -44,11 +46,13 @@ public class Neo4jMetricsFactory implements Factory<Lifecycle>
private final CheckPointerMonitor checkPointerMonitor;
private final LogRotationMonitor logRotationMonitor;
private final IdGeneratorFactory idGeneratorFactory;
private final DependencyResolver dependencyResolver;
private final LogService logService;

public Neo4jMetricsFactory( MetricRegistry registry, Config config, Monitors monitors,
TransactionCounters transactionCounters, PageCacheMonitor pageCacheCounters,
CheckPointerMonitor checkPointerMonitor, LogRotationMonitor logRotationMonitor,
IdGeneratorFactory idGeneratorFactory )
IdGeneratorFactory idGeneratorFactory, DependencyResolver dependencyResolver, LogService logService )
{
this.registry = registry;
this.config = config;
Expand All @@ -58,6 +62,8 @@ public Neo4jMetricsFactory( MetricRegistry registry, Config config, Monitors mon
this.checkPointerMonitor = checkPointerMonitor;
this.logRotationMonitor = logRotationMonitor;
this.idGeneratorFactory = idGeneratorFactory;
this.dependencyResolver = dependencyResolver;
this.logService = logService;
}

@Override
Expand All @@ -66,9 +72,10 @@ public Lifecycle newInstance()
final DBMetrics dbMetrics = new DBMetrics( registry, config,
transactionCounters, pageCacheCounters, checkPointerMonitor, logRotationMonitor, idGeneratorFactory );
final NetworkMetrics networkMetrics = new NetworkMetrics( config, monitors, registry );
final ClusterMetrics clusterMetrics = new ClusterMetrics( config, monitors, registry );
final JvmMetrics jvmMetrics = new JvmMetrics( config, registry );
final ClusterMetrics clusterMetrics = new ClusterMetrics( config, monitors, registry, dependencyResolver,
logService );
final CypherMetrics cypherMetrics = new CypherMetrics( config, monitors, registry );
final JvmMetrics jvmMetrics = new JvmMetrics( config, registry );
return new LifecycleAdapter()
{
@Override
Expand Down
Expand Up @@ -19,32 +19,34 @@
*/
package org.neo4j.metrics;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Settings;
import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase;
import org.neo4j.kernel.impl.ha.ClusterManager;
import org.neo4j.metrics.source.CypherMetrics;
import org.neo4j.test.TargetDirectory;
import org.neo4j.test.ha.ClusterRule;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.cypher_min_replan_interval;
import static org.neo4j.kernel.impl.ha.ClusterManager.clusterOfSize;
import static org.neo4j.metrics.MetricsSettings.CsvFile.single;
import static org.neo4j.metrics.MetricsSettings.csvEnabled;
import static org.neo4j.metrics.MetricsSettings.csvFile;
Expand All @@ -54,34 +56,32 @@ public class MetricsKernelExtensionFactoryIT
{
@Rule
public final TargetDirectory.TestDirectory folder = TargetDirectory.testDirForTest( getClass() );
@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() );

GraphDatabaseService db;
private File outputFile;
private ClusterManager.ManagedCluster cluster;
private HighlyAvailableGraphDatabase db;

@Before
public void setup() throws IOException
public void setup() throws Exception
{
String dbPath = folder.directory( "data" ).getAbsolutePath();
outputFile = folder.file( "metrics.csv" );
db = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder( dbPath ).
setConfig( GraphDatabaseSettings.allow_store_upgrade, Settings.TRUE ).
setConfig( GraphDatabaseSettings.cypher_min_replan_interval, "0" ).
setConfig( csvEnabled, Settings.TRUE ).
setConfig( csvFile, single.name() ).
setConfig( csvPath, outputFile.getAbsolutePath() ).newGraphDatabase();
}

@After
public void shutdown()
{
db.shutdown();
Map<String,String> config = new HashMap<>();
config.put( csvEnabled.name(), Settings.TRUE );
config.put( cypher_min_replan_interval.name(), "0" );
config.put( csvFile.name(), single.name() );
config.put( csvPath.name(), outputFile.getAbsolutePath() );
cluster = clusterRule.withSharedConfig( config ).withProvider( clusterOfSize( 1 ) ).startCluster();
db = cluster.getMaster();
}

@Test
public void mustLoadMetricsExtensionWhenConfigured() throws Exception
public void mustLoadMetricsExtensionWhenConfigured() throws Throwable
{
// Create some activity that will show up in the metrics data.
addNodes( 1000 );
cluster.stop();

// Awesome. Let's get some metric numbers.
// We should at least have a "timestamp" column, and a "neo4j.transaction.committed" column
Expand All @@ -106,12 +106,12 @@ public void mustLoadMetricsExtensionWhenConfigured() throws Exception
}

@Test
public void showReplanEvents() throws Exception
public void showReplanEvents() throws Throwable
{
//do a simple query to populate cache
try ( Transaction tx = db.beginTx() )
{
db.execute( "match (n:Label {name: 'Pontus'}) return n.name");
db.execute( "match (n:Label {name: 'Pontus'}) return n.name" );
tx.success();
}
//add some data, should make plan stale
Expand All @@ -124,12 +124,13 @@ public void showReplanEvents() throws Exception
}
//add more data just to make sure metrics are flushed
addNodes( 1000 );
cluster.stop();

//now we should have one replan event
try ( BufferedReader reader = new BufferedReader( new FileReader( outputFile ) ) )
{
String[] headers = reader.readLine().split( "," );
int replanColumn = Arrays.binarySearch( headers, CypherMetrics.REPLAN_EVENTS);
int replanColumn = Arrays.binarySearch( headers, CypherMetrics.REPLAN_EVENTS );
assertThat( replanColumn, is( not( -1 ) ) );

// Now we can verify that the number of committed transactions should never decrease.
Expand All @@ -147,7 +148,8 @@ public void showReplanEvents() throws Exception
}
}

private void addNodes( int numberOfNodes ) {
private void addNodes( int numberOfNodes )
{
for ( int i = 0; i < numberOfNodes; i++ )
{
try ( Transaction tx = db.beginTx() )
Expand All @@ -158,5 +160,4 @@ private void addNodes( int numberOfNodes ) {
}
}
}

}

0 comments on commit 1adb015

Please sign in to comment.