Skip to content

Commit

Permalink
Make CheckPointTracer and TransactionTracers monitorized
Browse files Browse the repository at this point in the history
The injected monitors are called whenever an event has been completed.
Listeners can be registered as usual by using the
Monitors.addMonitorListener(...).
  • Loading branch information
davidegrohmann committed Dec 9, 2015
1 parent 56e602c commit 5c4a185
Show file tree
Hide file tree
Showing 15 changed files with 205 additions and 85 deletions.
Expand Up @@ -32,10 +32,19 @@
import org.neo4j.kernel.impl.transaction.tracing.StoreApplyEvent;
import org.neo4j.kernel.impl.transaction.tracing.TransactionEvent;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.kernel.impl.util.JobScheduler;

public class DefaultTransactionTracer implements TransactionTracer, LogRotationMonitor
{
public interface Monitor
{
void lastLogRotationEventDuration( long millis );
}

private final Clock clock;
private final Monitor monitor;
private final JobScheduler jobScheduler;

private final AtomicLong counter = new AtomicLong();
private final AtomicLong accumulatedTotalTimeNanos = new AtomicLong();

Expand All @@ -46,7 +55,7 @@ public class DefaultTransactionTracer implements TransactionTracer, LogRotationM
@Override
public void close()
{
accumulatedTotalTimeNanos.addAndGet( clock.nanoTime() - startTimeNanos );
updateCountersAndNotifyListeners();
}
};

Expand All @@ -67,7 +76,6 @@ public void setLogRotated( boolean logRotated )
public LogRotateEvent beginLogRotate()
{
startTimeNanos = clock.nanoTime();
counter.incrementAndGet();
return logRotateEvent;
}

Expand Down Expand Up @@ -145,14 +153,16 @@ public void setReadOnly( boolean wasReadOnly )
}
};

public DefaultTransactionTracer()
public DefaultTransactionTracer( Monitor monitor, JobScheduler jobScheduler )
{
this( Clock.SYSTEM_CLOCK );
this( Clock.SYSTEM_CLOCK, monitor, jobScheduler );
}

public DefaultTransactionTracer( Clock clock )
public DefaultTransactionTracer( Clock clock, Monitor monitor, JobScheduler jobScheduler )
{
this.clock = clock;
this.monitor = monitor;
this.jobScheduler = jobScheduler;
}

@Override
Expand All @@ -172,4 +182,15 @@ public long logRotationAccumulatedTotalTimeMillis()
{
return TimeUnit.NANOSECONDS.toMillis( accumulatedTotalTimeNanos.get() );
}

private void updateCountersAndNotifyListeners()
{
counter.incrementAndGet();
long lastEventTime = clock.nanoTime() - startTimeNanos;
accumulatedTotalTimeNanos.addAndGet( lastEventTime );
jobScheduler.schedule( JobScheduler.Groups.metricsEvent, () -> {
long millis = TimeUnit.NANOSECONDS.toMillis( lastEventTime );
monitor.lastLogRotationEventDuration( millis );
} );
}
}
Expand Up @@ -147,8 +147,8 @@ public DependencyResolver get()
new JvmMetadataRepository() ).checkJvmCompatibilityAndIssueWarning();

String desiredImplementationName = config.get( GraphDatabaseFacadeFactory.Configuration.tracer );
tracers = dependencies.satisfyDependency(
new Tracers( desiredImplementationName, logging.getInternalLog( Tracers.class ) ) );
tracers = dependencies.satisfyDependency( new Tracers( desiredImplementationName,
logging.getInternalLog( Tracers.class ), monitors, jobScheduler ) );
dependencies.satisfyDependency( tracers.pageCacheTracer );
dependencies.satisfyDependency( tracers.transactionTracer );
dependencies.satisfyDependency( tracers.checkPointTracer );
Expand Down
Expand Up @@ -27,10 +27,19 @@
import org.neo4j.kernel.impl.transaction.tracing.LogCheckPointEvent;
import org.neo4j.kernel.impl.transaction.tracing.LogForceEvent;
import org.neo4j.kernel.impl.transaction.tracing.LogForceWaitEvent;
import org.neo4j.kernel.impl.util.JobScheduler;

public class DefaultCheckPointerTracer implements CheckPointTracer, CheckPointerMonitor
{
public interface Monitor
{
void lastCheckPointEventDuration( long millis );
}

private final Clock clock;
private final Monitor monitor;
private final JobScheduler jobScheduler;

private final AtomicLong counter = new AtomicLong();
private final AtomicLong accumulatedTotalTimeNanos = new AtomicLong();

Expand All @@ -41,13 +50,12 @@ public class DefaultCheckPointerTracer implements CheckPointTracer, CheckPointer
@Override
public void close()
{
accumulatedTotalTimeNanos.addAndGet( clock.nanoTime() - startTimeNanos );
updateCountersAndNotifyListeners();
}

@Override
public LogForceWaitEvent beginLogForceWait()
{

return LogForceWaitEvent.NULL;
}

Expand All @@ -58,21 +66,22 @@ public LogForceEvent beginLogForce()
}
};

public DefaultCheckPointerTracer()
public DefaultCheckPointerTracer( Monitor monitor, JobScheduler jobScheduler )
{
this( Clock.SYSTEM_CLOCK );
this( Clock.SYSTEM_CLOCK, monitor, jobScheduler );
}

DefaultCheckPointerTracer( Clock clock )
public DefaultCheckPointerTracer( Clock clock, Monitor monitor, JobScheduler jobScheduler )
{
this.clock = clock;
this.monitor = monitor;
this.jobScheduler = jobScheduler;
}

@Override
public LogCheckPointEvent beginCheckPoint()
{
startTimeNanos = clock.nanoTime();
counter.incrementAndGet();
return logCheckPointEvent;
}

Expand All @@ -87,4 +96,19 @@ public long checkPointAccumulatedTotalTimeMillis()
{
return TimeUnit.NANOSECONDS.toMillis( accumulatedTotalTimeNanos.get() );
}

private void updateCountersAndNotifyListeners()
{
final long lastEventTime = clock.nanoTime() - startTimeNanos;

// update counters
counter.incrementAndGet();
accumulatedTotalTimeNanos.addAndGet( lastEventTime );

// notify async
jobScheduler.schedule( JobScheduler.Groups.metricsEvent, () -> {
long millis = TimeUnit.NANOSECONDS.toMillis( lastEventTime );
monitor.lastCheckPointEventDuration( millis );
} );
}
}
Expand Up @@ -145,6 +145,11 @@ class Groups
* Network IO threads for the Bolt protocol.
*/
public static final Group boltNetworkIO = new Group( "BoltNetworkIO", NEW_THREAD );

/**
* Reporting thread for Metrics events
*/
public static final Group metricsEvent = new Group( "MetricsEvent", POOLED );
}

interface JobHandle
Expand Down
Expand Up @@ -25,6 +25,8 @@
import org.neo4j.kernel.impl.transaction.log.checkpoint.DefaultCheckPointerTracer;
import org.neo4j.kernel.impl.transaction.tracing.CheckPointTracer;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.monitoring.Monitors;

/**
* The default TracerFactory, when nothing else is otherwise configured.
Expand All @@ -38,20 +40,22 @@ public String getImplementationName()
}

@Override
public PageCacheTracer createPageCacheTracer()
public PageCacheTracer createPageCacheTracer( Monitors monitors, JobScheduler jobScheduler )
{
return new DefaultPageCacheTracer();
}

@Override
public TransactionTracer createTransactionTracer()
public TransactionTracer createTransactionTracer( Monitors monitors, JobScheduler jobScheduler )
{
return new DefaultTransactionTracer();
DefaultTransactionTracer.Monitor monitor = monitors.newMonitor( DefaultTransactionTracer.Monitor.class );
return new DefaultTransactionTracer( monitor, jobScheduler );
}

@Override
public CheckPointTracer createCheckPointTracer()
public CheckPointTracer createCheckPointTracer( Monitors monitors, JobScheduler jobScheduler )
{
return new DefaultCheckPointerTracer();
DefaultCheckPointerTracer.Monitor monitor = monitors.newMonitor( DefaultCheckPointerTracer.Monitor.class );
return new DefaultCheckPointerTracer( monitor, jobScheduler );
}
}
Expand Up @@ -22,6 +22,8 @@
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.kernel.impl.transaction.tracing.CheckPointTracer;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.monitoring.Monitors;

/**
* A TracerFactory determines the implementation of the tracers, that a database should use. Each implementation has
Expand All @@ -38,19 +40,25 @@ public interface TracerFactory

/**
* Create a new PageCacheTracer instance.
*
* @param jobScheduler a scheduler for async jobs
* @return The created instance.
*/
PageCacheTracer createPageCacheTracer();
PageCacheTracer createPageCacheTracer( Monitors monitors, JobScheduler jobScheduler );

/**
* Create a new TransactionTracer instance.
*
* @param jobScheduler a scheduler for async jobs
* @return The created instance.
*/
TransactionTracer createTransactionTracer();
TransactionTracer createTransactionTracer( Monitors monitors, JobScheduler jobScheduler );

/**
* Create a new CheckPointTracer instance.
*
* @param jobScheduler a scheduler for async jobs
* @return The created instance.
*/
CheckPointTracer createCheckPointTracer();
CheckPointTracer createCheckPointTracer( Monitors monitors, JobScheduler jobScheduler );
}
Expand Up @@ -23,6 +23,8 @@
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.kernel.impl.transaction.tracing.CheckPointTracer;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;

/**
Expand Down Expand Up @@ -106,7 +108,7 @@ public class Tracers
* .TracerFactory} implementation, as given by its {@link TracerFactory#getImplementationName()} method.
* @param msgLog A {@link Log} for logging when the desired implementation cannot be created.
*/
public Tracers( String desiredImplementationName, Log msgLog )
public Tracers( String desiredImplementationName, Log msgLog, Monitors monitors, JobScheduler jobScheduler )
{
if ( "null".equalsIgnoreCase( desiredImplementationName ) )
{
Expand Down Expand Up @@ -141,9 +143,9 @@ public Tracers( String desiredImplementationName, Log msgLog )
msgLog.warn( "Using default tracer implementations instead of '%s'", desiredImplementationName );
}

pageCacheTracer = foundFactory.createPageCacheTracer();
transactionTracer = foundFactory.createTransactionTracer();
checkPointTracer = foundFactory.createCheckPointTracer();
pageCacheTracer = foundFactory.createPageCacheTracer( monitors, jobScheduler );
transactionTracer = foundFactory.createTransactionTracer( monitors, jobScheduler );
checkPointTracer = foundFactory.createCheckPointTracer( monitors, jobScheduler );
}
}
}
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
Expand All @@ -37,6 +38,7 @@
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.TargetDirectory;

import static org.junit.Assert.assertEquals;
Expand All @@ -46,6 +48,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class GraphDatabaseFacadeFactoryTest
{
Expand All @@ -57,6 +60,12 @@ public class GraphDatabaseFacadeFactoryTest
private final GraphDatabaseFacadeFactory.Dependencies deps =
mock( GraphDatabaseFacadeFactory.Dependencies.class, RETURNS_MOCKS );

@Before
public void setup()
{
when( deps.monitors() ).thenReturn( new Monitors() );
}

@Test
public void shouldThrowAppropriateExceptionIfStartFails()
{
Expand Down

0 comments on commit 5c4a185

Please sign in to comment.