Skip to content

Commit

Permalink
Add server metrics to metrics extension
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Mar 11, 2016
1 parent fc04860 commit 1a2878d
Show file tree
Hide file tree
Showing 23 changed files with 445 additions and 116 deletions.
Expand Up @@ -174,7 +174,7 @@ public DependencyResolver get()
transactionMonitor = dependencies.satisfyDependency( createTransactionStats() ); transactionMonitor = dependencies.satisfyDependency( createTransactionStats() );


kernelExtensions = dependencies.satisfyDependency( new KernelExtensions( kernelExtensions = dependencies.satisfyDependency( new KernelExtensions(
new SimpleKernelContext( fileSystem, storeDir, databaseInfo ), new SimpleKernelContext( fileSystem, storeDir, databaseInfo, dependencies ),
externalDependencies.kernelExtensions(), externalDependencies.kernelExtensions(),
dependencies, dependencies,
UnsatisfiedDependencyStrategies.fail() ) ); UnsatisfiedDependencyStrategies.fail() ) );
Expand Down
Expand Up @@ -23,6 +23,7 @@


import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.factory.DatabaseInfo; import org.neo4j.kernel.impl.factory.DatabaseInfo;
import org.neo4j.kernel.impl.util.DependencySatisfier;


public interface KernelContext public interface KernelContext
{ {
Expand All @@ -31,4 +32,6 @@ public interface KernelContext
File storeDir(); File storeDir();


DatabaseInfo databaseInfo(); DatabaseInfo databaseInfo();

DependencySatisfier dependencySatisfier();
} }
Expand Up @@ -23,18 +23,22 @@


import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.factory.DatabaseInfo; import org.neo4j.kernel.impl.factory.DatabaseInfo;
import org.neo4j.kernel.impl.util.DependencySatisfier;


public class SimpleKernelContext implements KernelContext public class SimpleKernelContext implements KernelContext
{ {
private final FileSystemAbstraction fileSystem; private final FileSystemAbstraction fileSystem;
private final File storeDir; private final File storeDir;
private final DatabaseInfo databaseInfo; private final DatabaseInfo databaseInfo;
private final DependencySatisfier satisfier;


public SimpleKernelContext( FileSystemAbstraction fileSystem, File storeDir, DatabaseInfo databaseInfo ) public SimpleKernelContext( FileSystemAbstraction fileSystem, File storeDir, DatabaseInfo databaseInfo,
DependencySatisfier satisfier )
{ {
this.fileSystem = fileSystem; this.fileSystem = fileSystem;
this.storeDir = storeDir; this.storeDir = storeDir;
this.databaseInfo = databaseInfo; this.databaseInfo = databaseInfo;
this.satisfier = satisfier;
} }


@Override @Override
Expand All @@ -54,4 +58,10 @@ public DatabaseInfo databaseInfo()
{ {
return databaseInfo; return databaseInfo;
} }

@Override
public DependencySatisfier dependencySatisfier()
{
return satisfier;
}
} }
Expand Up @@ -284,7 +284,7 @@ public Label apply( long from )
deps.satisfyDependencies( fileSystem, config, logService, indexStoreView ); deps.satisfyDependencies( fileSystem, config, logService, indexStoreView );


KernelExtensions extensions = life.add( new KernelExtensions( KernelExtensions extensions = life.add( new KernelExtensions(
new SimpleKernelContext( fileSystem, storeDir, DatabaseInfo.UNKNOWN ), new SimpleKernelContext( fileSystem, storeDir, DatabaseInfo.UNKNOWN, deps ),
kernelExtensions, deps, UnsatisfiedDependencyStrategies.ignore() ) ); kernelExtensions, deps, UnsatisfiedDependencyStrategies.ignore() ) );


SchemaIndexProvider provider = extensions.resolveDependency( SchemaIndexProvider.class, SchemaIndexProvider provider = extensions.resolveDependency( SchemaIndexProvider.class,
Expand Down
Expand Up @@ -143,7 +143,8 @@ public BatchingNeoStores( FileSystemAbstraction fileSystem, File storeDir, Confi
dependencies.satisfyDependency( this ); dependencies.satisfyDependency( this );
dependencies.satisfyDependency( logService ); dependencies.satisfyDependency( logService );
dependencies.satisfyDependency( IndexStoreView.EMPTY ); dependencies.satisfyDependency( IndexStoreView.EMPTY );
KernelContext kernelContext = new SimpleKernelContext( fileSystem, storeDir, DatabaseInfo.UNKNOWN ); KernelContext kernelContext = new SimpleKernelContext( fileSystem, storeDir, DatabaseInfo.UNKNOWN,
dependencies );
@SuppressWarnings( { "unchecked", "rawtypes" } ) @SuppressWarnings( { "unchecked", "rawtypes" } )
KernelExtensions extensions = life.add( new KernelExtensions( KernelExtensions extensions = life.add( new KernelExtensions(
kernelContext, (Iterable) Service.load( KernelExtensionFactory.class ), kernelContext, (Iterable) Service.load( KernelExtensionFactory.class ),
Expand Down
Expand Up @@ -54,6 +54,7 @@
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import javax.servlet.DispatcherType; import javax.servlet.DispatcherType;
import javax.servlet.Filter; import javax.servlet.Filter;
import javax.servlet.ServletException; import javax.servlet.ServletException;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class Jetty9WebServer implements WebServer
{ {
private boolean wadlEnabled; private boolean wadlEnabled;
private Collection<InjectableProvider<?>> defaultInjectables; private Collection<InjectableProvider<?>> defaultInjectables;
private Consumer<Server> jettyCreatedCallback;


private static class FilterDefinition private static class FilterDefinition
{ {
Expand Down Expand Up @@ -167,6 +169,10 @@ public void start() throws Exception
} }
} }


if ( jettyCreatedCallback != null )
{
jettyCreatedCallback.accept( jetty );
}
} }


handlers = new HandlerList(); handlers = new HandlerList();
Expand Down Expand Up @@ -280,6 +286,11 @@ public void setDefaultInjectables( Collection<InjectableProvider<?>> defaultInje
this.defaultInjectables = defaultInjectables; this.defaultInjectables = defaultInjectables;
} }


public void setJettyCreatedCallback( Consumer<Server> callback )
{
this.jettyCreatedCallback = callback;
}

@Override @Override
public void removeJAXRSPackages( List<String> packageNames, String serverMountPoint ) public void removeJAXRSPackages( List<String> packageNames, String serverMountPoint )
{ {
Expand Down
Expand Up @@ -19,10 +19,13 @@
*/ */
package org.neo4j.server.web; package org.neo4j.server.web;


import org.eclipse.jetty.server.Server;

import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
import javax.servlet.Filter; import javax.servlet.Filter;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -70,4 +73,6 @@ void invokeDirectly( String targetUri, HttpServletRequest request, HttpServletRe
void setWadlEnabled( boolean wadlEnabled ); void setWadlEnabled( boolean wadlEnabled );


void setDefaultInjectables( Collection<InjectableProvider<?>> defaultInjectables ); void setDefaultInjectables( Collection<InjectableProvider<?>> defaultInjectables );

void setJettyCreatedCallback( Consumer<Server> callback );
} }
Expand Up @@ -665,7 +665,7 @@ public void shouldContainTransactionsThatHappenDuringBackupProcess() throws Thro


OnlineBackupKernelExtension backup = (OnlineBackupKernelExtension) OnlineBackupKernelExtension backup = (OnlineBackupKernelExtension)
new OnlineBackupExtensionFactory().newInstance( new OnlineBackupExtensionFactory().newInstance(
new SimpleKernelContext( fileSystem, storeDir, DatabaseInfo.UNKNOWN ), new SimpleKernelContext( fileSystem, storeDir, DatabaseInfo.UNKNOWN, dependencies ),
DependenciesProxy.dependencies( dependencies, OnlineBackupExtensionFactory.Dependencies.class ) DependenciesProxy.dependencies( dependencies, OnlineBackupExtensionFactory.Dependencies.class )
); );
backup.start(); backup.start();
Expand Down
Expand Up @@ -183,7 +183,8 @@ public Integer call() throws Exception
LifeSupport life = new LifeSupport(); LifeSupport life = new LifeSupport();
try try
{ {
SimpleKernelContext context = new SimpleKernelContext( fileSystem, storeDir, DatabaseInfo.UNKNOWN ); SimpleKernelContext context = new SimpleKernelContext( fileSystem, storeDir, DatabaseInfo.UNKNOWN,
dependencies );
OnlineBackupExtensionFactory.Dependencies extensionDeps = DependenciesProxy.dependencies( OnlineBackupExtensionFactory.Dependencies extensionDeps = DependenciesProxy.dependencies(
dependencies, OnlineBackupExtensionFactory.Dependencies.class ); dependencies, OnlineBackupExtensionFactory.Dependencies.class );
life.add( new OnlineBackupExtensionFactory().newInstance( context, extensionDeps ) ); life.add( new OnlineBackupExtensionFactory().newInstance( context, extensionDeps ) );
Expand Down
Expand Up @@ -77,6 +77,9 @@ public class MetricsSettings
@Description( "Enable reporting metrics about HA cluster info." ) @Description( "Enable reporting metrics about HA cluster info." )
public static Setting<Boolean> neoClusterEnabled = setting( public static Setting<Boolean> neoClusterEnabled = setting(
"metrics.neo4j.cluster.enabled", Settings.BOOLEAN, neoEnabled ); "metrics.neo4j.cluster.enabled", Settings.BOOLEAN, neoEnabled );
@Description( "Enable reporting metrics about Server threading info." )
public static Setting<Boolean> neoServerEnabled = setting(
"metrics.neo4j.server.enabled", Settings.BOOLEAN, neoEnabled );


@Description( "Enable reporting metrics about the duration of garbage collections" ) @Description( "Enable reporting metrics about the duration of garbage collections" )
public static Setting<Boolean> jvmGcEnabled = setting( "metrics.jvm.gc.enabled", Settings.BOOLEAN, neoEnabled ); public static Setting<Boolean> jvmGcEnabled = setting( "metrics.jvm.gc.enabled", Settings.BOOLEAN, neoEnabled );
Expand Down
Expand Up @@ -54,6 +54,7 @@
import org.neo4j.metrics.source.jvm.MemoryBuffersMetrics; import org.neo4j.metrics.source.jvm.MemoryBuffersMetrics;
import org.neo4j.metrics.source.jvm.MemoryPoolMetrics; import org.neo4j.metrics.source.jvm.MemoryPoolMetrics;
import org.neo4j.metrics.source.jvm.ThreadMetrics; import org.neo4j.metrics.source.jvm.ThreadMetrics;
import org.neo4j.metrics.source.server.ServerMetrics;


public class Neo4jMetricsBuilder public class Neo4jMetricsBuilder
{ {
Expand Down Expand Up @@ -172,9 +173,9 @@ public boolean build()
result = true; result = true;
} }


if( config.get( MetricsSettings.boltMessagesEnabled )) if ( config.get( MetricsSettings.boltMessagesEnabled ) )
{ {
life.add( new BoltMetrics( registry, dependencies.monitors() )); life.add( new BoltMetrics( registry, dependencies.monitors() ) );
result = true; result = true;
} }


Expand All @@ -193,14 +194,14 @@ public boolean build()
if ( config.get( MetricsSettings.coreEdgeEnabled ) ) if ( config.get( MetricsSettings.coreEdgeEnabled ) )
{ {
OperationalMode mode = kernelContext.databaseInfo().operationalMode; OperationalMode mode = kernelContext.databaseInfo().operationalMode;
if ( mode == OperationalMode.core) if ( mode == OperationalMode.core )
{ {
life.add( new CoreMetrics( dependencies.monitors(), registry, dependencies.raft() ) ); life.add( new CoreMetrics( dependencies.monitors(), registry, dependencies.raft() ) );
result = true; result = true;
} }
else if ( mode == OperationalMode.edge) else if ( mode == OperationalMode.edge )
{ {
life.add( new EdgeMetrics( dependencies.monitors(), registry) ); life.add( new EdgeMetrics( dependencies.monitors(), registry ) );
result = true; result = true;
} }
else else
Expand All @@ -210,6 +211,12 @@ else if ( mode == OperationalMode.edge)
} }
} }


if ( config.get( MetricsSettings.neoServerEnabled ) )
{
life.add( new ServerMetrics( registry, logService, kernelContext.dependencySatisfier() ) );
result = true;
}

return result; return result;
} }
} }
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.metrics.source.server;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;

import org.neo4j.kernel.impl.annotations.Documented;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.util.DependencySatisfier;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;

import static com.codahale.metrics.MetricRegistry.name;

@Documented( ".Server Metrics" )
public class ServerMetrics extends LifecycleAdapter
{
private static final String NAME_PREFIX = "neo4j.server";

@Documented( "The total number of idle threads in the jetty pool" )
public static final String THREAD_JETTY_IDLE = name( NAME_PREFIX, "threads.jetty.idle" );
@Documented( "The total number of threads (both idle and busy) in the jetty pool" )
public static final String THREAD_JETTY_ALL = name( NAME_PREFIX, "threads.jetty.all" );

private final MetricRegistry registry;
private volatile ServerThreadView serverThreadView;

public ServerMetrics( MetricRegistry registry, LogService logService, DependencySatisfier satisfier )
{
Log userLog = logService.getUserLog( getClass() );
this.registry = registry;
this.serverThreadView = new ServerThreadView()
{
@Override
public int idleThreads()
{
userLog.warn( "Server thread metrics not available (missing " + THREAD_JETTY_IDLE + ")" );
return -1;
}

@Override
public int allThreads()
{
userLog.warn( "Server thread metrics not available (missing " + THREAD_JETTY_ALL + ")" );
return -1;
}
};
satisfier.satisfyDependency( (ServerThreadViewSetter) serverThreadView -> {
assert ServerMetrics.this.serverThreadView != null;
ServerMetrics.this.serverThreadView = serverThreadView;
userLog.info( "Server thread metrics has been registered successfully" );
} );
}

@Override
public void start()
{
registry.register( THREAD_JETTY_IDLE, (Gauge<Integer>) () -> serverThreadView.idleThreads() );
registry.register( THREAD_JETTY_ALL, (Gauge<Integer>) () -> serverThreadView.allThreads() );
}

@Override
public void stop()
{
registry.remove( THREAD_JETTY_IDLE );
registry.remove( THREAD_JETTY_ALL );
}
}
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.metrics.source.server;

public interface ServerThreadView
{
int allThreads();
int idleThreads();
}
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.metrics.source.server;

public interface ServerThreadViewSetter
{
void set( ServerThreadView serverThreadView );
}
Expand Up @@ -43,8 +43,8 @@
import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.chunk; import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.chunk;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.boltConnector; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.boltConnector;
import static org.neo4j.helpers.collection.MapUtil.map; import static org.neo4j.helpers.collection.MapUtil.map;
import static org.neo4j.metrics.CoreEdgeMetricsIT.metricsCsv; import static org.neo4j.metrics.MetricsTestHelper.metricsCsv;
import static org.neo4j.metrics.CoreEdgeMetricsIT.readLastValue; import static org.neo4j.metrics.MetricsTestHelper.readLongValue;
import static org.neo4j.test.Assert.assertEventually; import static org.neo4j.test.Assert.assertEventually;


public class BoltMetricsIT public class BoltMetricsIT
Expand Down Expand Up @@ -80,20 +80,20 @@ public void shouldMonitorBolt() throws Throwable


// Then // Then
assertEventually( "init request shows up as recieved", assertEventually( "init request shows up as recieved",
() -> readLastValue( metricsCsv( metricsFolder, BoltMetrics.MESSAGES_RECIEVED ) ), () -> readLongValue( metricsCsv( metricsFolder, BoltMetrics.MESSAGES_RECIEVED ) ),
equalTo( 1L ), 5, TimeUnit.SECONDS ); equalTo( 1L ), 5, TimeUnit.SECONDS );
assertEventually( "init request shows up as started", assertEventually( "init request shows up as started",
() -> readLastValue( metricsCsv( metricsFolder, BoltMetrics.MESSAGES_STARTED ) ), () -> readLongValue( metricsCsv( metricsFolder, BoltMetrics.MESSAGES_STARTED ) ),
equalTo( 1L ), 5, TimeUnit.SECONDS ); equalTo( 1L ), 5, TimeUnit.SECONDS );
assertEventually( "init request shows up as done", assertEventually( "init request shows up as done",
() -> readLastValue( metricsCsv( metricsFolder, BoltMetrics.MESSAGES_DONE ) ), () -> readLongValue( metricsCsv( metricsFolder, BoltMetrics.MESSAGES_DONE ) ),
equalTo( 1L ), 5, TimeUnit.SECONDS ); equalTo( 1L ), 5, TimeUnit.SECONDS );


assertEventually( "queue time shows up", assertEventually( "queue time shows up",
() -> readLastValue( metricsCsv( metricsFolder, BoltMetrics.TOTAL_QUEUE_TIME ) ), () -> readLongValue( metricsCsv( metricsFolder, BoltMetrics.TOTAL_QUEUE_TIME ) ),
greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS ); greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS );
assertEventually( "processing time shows up", assertEventually( "processing time shows up",
() -> readLastValue( metricsCsv( metricsFolder, BoltMetrics.TOTAL_PROCESSING_TIME ) ), () -> readLongValue( metricsCsv( metricsFolder, BoltMetrics.TOTAL_PROCESSING_TIME ) ),
greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS ); greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS );


} }
Expand Down

0 comments on commit 1a2878d

Please sign in to comment.