Skip to content

Commit

Permalink
Add Bolt message metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jakewins committed Feb 11, 2016
1 parent c8be444 commit 71640a2
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 49 deletions.
@@ -1,3 +1,22 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.runtime; package org.neo4j.bolt.v1.runtime;


import java.time.Clock; import java.time.Clock;
Expand Down Expand Up @@ -60,36 +79,36 @@ public String key()
@Override @Override
public <A> void init( String clientName, A attachment, Callback<Void,A> callback ) public <A> void init( String clientName, A attachment, Callback<Void,A> callback )
{ {
monitor.messageRecieved(); monitor.messageReceived();
delegate.init( clientName, attachment, withMonitor( callback ) ); delegate.init( clientName, attachment, withMonitor( callback ) );
} }


@Override @Override
public <A> void run( String statement, Map<String,Object> params, A attachment, public <A> void run( String statement, Map<String,Object> params, A attachment,
Callback<StatementMetadata,A> callback ) Callback<StatementMetadata,A> callback )
{ {
monitor.messageRecieved(); monitor.messageReceived();
delegate.run( statement, params, attachment, withMonitor( callback ) ); delegate.run( statement, params, attachment, withMonitor( callback ) );
} }


@Override @Override
public <A> void pullAll( A attachment, Callback<RecordStream,A> callback ) public <A> void pullAll( A attachment, Callback<RecordStream,A> callback )
{ {
monitor.messageRecieved(); monitor.messageReceived();
delegate.pullAll( attachment, withMonitor( callback ) ); delegate.pullAll( attachment, withMonitor( callback ) );
} }


@Override @Override
public <A> void discardAll( A attachment, Callback<Void,A> callback ) public <A> void discardAll( A attachment, Callback<Void,A> callback )
{ {
monitor.messageRecieved(); monitor.messageReceived();
delegate.discardAll( attachment, withMonitor( callback ) ); delegate.discardAll( attachment, withMonitor( callback ) );
} }


@Override @Override
public <A> void reset( A attachment, Callback<Void,A> callback ) public <A> void reset( A attachment, Callback<Void,A> callback )
{ {
monitor.messageRecieved(); monitor.messageReceived();
delegate.reset( attachment, withMonitor( callback ) ); delegate.reset( attachment, withMonitor( callback ) );
} }


Expand Down Expand Up @@ -159,14 +178,12 @@ public interface SessionMonitor
/** /**
* Called whenever a request is received. This happens after a request is * Called whenever a request is received. This happens after a request is
* deserialized, but before it is queued pending processing. * deserialized, but before it is queued pending processing.
* @return an event object that will be invoked as the request moves through
* Bolt internals.
*/ */
void messageRecieved(); void messageReceived();


/** /**
* Called after a request is done queueing, right before the worker thread takes on the request * Called after a request is done queueing, right before the worker thread takes on the request
* @param queueTime time between {@link #messageRecieved()} and this call, in milliseconds * @param queueTime time between {@link #messageReceived()} and this call, in milliseconds
*/ */
void processingStarted( long queueTime ); void processingStarted( long queueTime );


Expand Down
Expand Up @@ -583,7 +583,11 @@ public String toString()
*/ */
private void before( Object attachment, Callback cb ) private void before( Object attachment, Callback cb )
{ {
cb.started( attachment ); if( cb != null )
{
cb.started( attachment );
}

if ( hasTransaction() ) if ( hasTransaction() )
{ {
txBridge.bindTransactionToCurrentThread( currentTransaction ); txBridge.bindTransactionToCurrentThread( currentTransaction );
Expand Down
Expand Up @@ -45,8 +45,7 @@ public class ThreadedSessions implements Sessions
private JobScheduler scheduler; private JobScheduler scheduler;
private LogService logging; private LogService logging;


public ThreadedSessions( Sessions delegate, JobScheduler scheduler, public ThreadedSessions( Sessions delegate, JobScheduler scheduler, LogService logging )
LogService logging )
{ {
this.delegate = delegate; this.delegate = delegate;
this.scheduler = scheduler; this.scheduler = scheduler;
Expand Down
@@ -1,3 +1,22 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.runtime; package org.neo4j.bolt.v1.runtime;


import org.junit.Test; import org.junit.Test;
Expand Down Expand Up @@ -86,7 +105,7 @@ private static class CountingSessionMonitor implements MonitoredSessions.Session
public long processingTime = 0; public long processingTime = 0;


@Override @Override
public void messageRecieved() public void messageReceived()
{ {
messagesRecieved++; messagesRecieved++;
} }
Expand Down

This file was deleted.

13 changes: 13 additions & 0 deletions enterprise/metrics/pom.xml
Expand Up @@ -100,6 +100,12 @@
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>


<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-bolt</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>


<dependency> <dependency>
<groupId>org.neo4j</groupId> <groupId>org.neo4j</groupId>
Expand Down Expand Up @@ -162,6 +168,13 @@
<scope>test</scope> <scope>test</scope>
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-bolt</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>


<dependency> <dependency>
<groupId>io.dropwizard.metrics</groupId> <groupId>io.dropwizard.metrics</groupId>
Expand Down
Expand Up @@ -90,6 +90,9 @@ public class MetricsSettings
@Description( "Enable reporting metrics about number of occurred replanning events." ) @Description( "Enable reporting metrics about number of occurred replanning events." )
public static Setting<Boolean> cypherPlanningEnabled = setting( "metrics.cypher.replanning.enabled", Settings.BOOLEAN, neoEnabled ); public static Setting<Boolean> cypherPlanningEnabled = setting( "metrics.cypher.replanning.enabled", Settings.BOOLEAN, neoEnabled );


@Description( "Enable reporting metrics about Bolt Protocol message processing." )
public static Setting<Boolean> boltMessagesEnabled = setting( "metrics.bolt.messages.enabled", Settings.BOOLEAN, neoEnabled );

// CSV settings // CSV settings
@Description( "Set to `true` to enable exporting metrics to CSV files" ) @Description( "Set to `true` to enable exporting metrics to CSV files" )
public static Setting<Boolean> csvEnabled = setting( "metrics.csv.enabled", Settings.BOOLEAN, Settings.FALSE ); public static Setting<Boolean> csvEnabled = setting( "metrics.csv.enabled", Settings.BOOLEAN, Settings.FALSE );
Expand Down
Expand Up @@ -19,10 +19,9 @@
*/ */
package org.neo4j.metrics.source; package org.neo4j.metrics.source;


import java.util.function.Supplier;

import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;


import java.util.function.Supplier;


import org.neo4j.coreedge.raft.CoreMetaData; import org.neo4j.coreedge.raft.CoreMetaData;
import org.neo4j.io.pagecache.monitoring.PageCacheCounters; import org.neo4j.io.pagecache.monitoring.PageCacheCounters;
Expand All @@ -42,6 +41,7 @@
import org.neo4j.metrics.output.EventReporter; import org.neo4j.metrics.output.EventReporter;
import org.neo4j.metrics.source.cluster.ClusterMetrics; import org.neo4j.metrics.source.cluster.ClusterMetrics;
import org.neo4j.metrics.source.cluster.NetworkMetrics; import org.neo4j.metrics.source.cluster.NetworkMetrics;
import org.neo4j.metrics.source.db.BoltMetrics;
import org.neo4j.metrics.source.db.CheckPointingMetrics; import org.neo4j.metrics.source.db.CheckPointingMetrics;
import org.neo4j.metrics.source.db.CypherMetrics; import org.neo4j.metrics.source.db.CypherMetrics;
import org.neo4j.metrics.source.db.EntityCountMetrics; import org.neo4j.metrics.source.db.EntityCountMetrics;
Expand Down Expand Up @@ -170,6 +170,12 @@ public boolean build()
result = true; result = true;
} }


if( config.get( MetricsSettings.boltMessagesEnabled ))
{
life.add( new BoltMetrics( registry, dependencies.monitors() ));
result = true;
}

if ( config.get( MetricsSettings.jvmMemoryEnabled ) ) if ( config.get( MetricsSettings.jvmMemoryEnabled ) )
{ {
life.add( new MemoryPoolMetrics( registry ) ); life.add( new MemoryPoolMetrics( registry ) );
Expand Down
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.metrics.source.db;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;

import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.bolt.v1.runtime.MonitoredSessions;
import org.neo4j.kernel.impl.annotations.Documented;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;

import static com.codahale.metrics.MetricRegistry.name;

@Documented( ".Bolt Metrics" )
public class BoltMetrics extends LifecycleAdapter
{
private static final String NAME_PREFIX = "neo4j.bolt";

@Documented( "The total number of messages received via Bolt since this instance started." )
public static final String MESSAGES_RECIEVED = name( NAME_PREFIX, "messages_received" );
@Documented( "The total number of messages work has started on since this instance started. This is different " +
"from messages received in that this counter tracks how many of the received messages have" +
"been taken on by a worker thread." )
public static final String MESSAGES_STARTED = name( NAME_PREFIX, "messages_started" );
@Documented( "The total number of messages work has completed on since this instance started. This includes " +
"successful, failed and ignored Bolt messages." )
public static final String MESSAGES_DONE = name( NAME_PREFIX, "messages_done" );

@Documented( "The accumulated time messages have spent waiting for a worker thread." )
public static final String TOTAL_QUEUE_TIME = name( NAME_PREFIX, "accumulated_queue_time" );
@Documented( "The accumulated time worker threads have spent processing messages." )
public static final String TOTAL_PROCESSING_TIME = name( NAME_PREFIX, "accumulated_processing_time" );


private final MetricRegistry registry;
private final Monitors monitors;
private final BoltMetricsMonitor boltMonitor = new BoltMetricsMonitor();

public BoltMetrics( MetricRegistry registry, Monitors monitors )
{
this.registry = registry;
this.monitors = monitors;
}

@Override
public void start()
{
monitors.addMonitorListener( boltMonitor );
registry.register( MESSAGES_RECIEVED, (Gauge<Long>) boltMonitor.recieved::get );
registry.register( MESSAGES_STARTED, (Gauge<Long>) boltMonitor.started::get );
registry.register( MESSAGES_DONE, (Gauge<Long>) boltMonitor.done::get );
registry.register( TOTAL_QUEUE_TIME, (Gauge<Long>) boltMonitor.queueTime::get );
registry.register( TOTAL_PROCESSING_TIME, (Gauge<Long>) boltMonitor.processingTime::get );
}

@Override
public void stop()
{
registry.remove( MESSAGES_RECIEVED );
registry.remove( MESSAGES_STARTED );
registry.remove( MESSAGES_DONE );
registry.remove( TOTAL_QUEUE_TIME );
registry.remove( TOTAL_PROCESSING_TIME );
monitors.removeMonitorListener( boltMonitor );
}

private class BoltMetricsMonitor implements MonitoredSessions.SessionMonitor
{
public final AtomicLong recieved = new AtomicLong();
public final AtomicLong started = new AtomicLong();
public final AtomicLong done = new AtomicLong();

// It will take about 300 million years of queue/processing time to overflow these
// Even if we run a million processors concurrently, the instance would need to
// run uninterrupted for three hundred years before the monitoring had a hiccup.
public final AtomicLong queueTime = new AtomicLong();
public final AtomicLong processingTime = new AtomicLong();

@Override
public void messageReceived()
{
recieved.incrementAndGet();
}

@Override
public void processingStarted( long queueTime )
{
this.queueTime.addAndGet( queueTime );
started.incrementAndGet();
}

@Override
public void processingDone( long processingTime )
{
this.processingTime.addAndGet( processingTime );
done.incrementAndGet();
}
}
}

0 comments on commit 71640a2

Please sign in to comment.