Skip to content

Commit

Permalink
Report tracing events on statement force close,
Browse files Browse the repository at this point in the history
forcefully close statement after transaction commit or rollback only to catch all tracing events that can happen during transaction close.
  • Loading branch information
MishaDemianenko committed Feb 27, 2017
1 parent 5e2c43e commit a26e1ba
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 65 deletions.
Expand Up @@ -131,7 +131,6 @@ public final void close()
if ( cursor.pagedFile != null ) if ( cursor.pagedFile != null )
{ {
cursor.unpinCurrentPage(); cursor.unpinCurrentPage();
cursor.tracer.reportEvents();
cursor.releaseCursor(); cursor.releaseCursor();
// We null out the pagedFile field to allow it and its (potentially big) translation table to be garbage // We null out the pagedFile field to allow it and its (potentially big) translation table to be garbage
// collected when the file is unmapped, since the cursors can stick around in thread local caches, etc. // collected when the file is unmapped, since the cursors can stick around in thread local caches, etc.
Expand Down
Expand Up @@ -31,15 +31,6 @@


public class DefaultPageCursorTracer implements PageCursorTracer public class DefaultPageCursorTracer implements PageCursorTracer
{ {
private long totalPins = 0L;
private long totalUnpins = 0L;
private long totalFaults = 0L;
private long totalBytesRead = 0L;
private long totalBytesWritten = 0L;
private long totalEvictions = 0L;
private long totalEvictionExceptions = 0L;
private long totalFlushes = 0L;

private long pins; private long pins;
private long unpins; private long unpins;
private long faults; private long faults;
Expand Down Expand Up @@ -91,19 +82,11 @@ public void reportEvents()
{ {
pageCacheTracer.flushes( flushes ); pageCacheTracer.flushes( flushes );
} }
updateTotals(); reset();
} }


private void updateTotals() private void reset()
{ {
this.totalPins += pins;
this.totalUnpins += unpins;
this.totalFaults += faults;
this.totalBytesRead += bytesRead;
this.totalBytesWritten += bytesWritten;
this.totalEvictions += evictions;
this.totalEvictionExceptions += evictionExceptions;
this.totalFlushes += flushes;
pins = 0; pins = 0;
unpins = 0; unpins = 0;
faults = 0; faults = 0;
Expand All @@ -117,49 +100,49 @@ private void updateTotals()
@Override @Override
public long faults() public long faults()
{ {
return totalFaults + faults; return faults;
} }


@Override @Override
public long pins() public long pins()
{ {
return totalPins + pins; return pins;
} }


@Override @Override
public long unpins() public long unpins()
{ {
return totalUnpins + unpins; return unpins;
} }


@Override @Override
public long bytesRead() public long bytesRead()
{ {
return totalBytesRead + bytesRead; return bytesRead;
} }


@Override @Override
public long evictions() public long evictions()
{ {
return totalEvictions + evictions; return evictions;
} }


@Override @Override
public long evictionExceptions() public long evictionExceptions()
{ {
return totalEvictionExceptions + evictionExceptions; return evictionExceptions;
} }


@Override @Override
public long bytesWritten() public long bytesWritten()
{ {
return totalBytesWritten + bytesWritten; return bytesWritten;
} }


@Override @Override
public long flushes() public long flushes()
{ {
return totalFlushes + flushes; return flushes;
} }


@Override @Override
Expand Down
Expand Up @@ -2115,7 +2115,8 @@ public void tracerMustBeNotifiedAboutPinUnpinFaultAndEvictEventsWhenReading() th
DefaultPageCacheTracer tracer = new DefaultPageCacheTracer(); DefaultPageCacheTracer tracer = new DefaultPageCacheTracer();
generateFileWithRecords( file( "a" ), recordCount, recordSize ); generateFileWithRecords( file( "a" ), recordCount, recordSize );


getPageCache( fs, maxPages, pageCachePageSize, tracer, DefaultPageCursorTracerSupplier.INSTANCE ); DefaultPageCursorTracerSupplier cursorTracerSupplier = DefaultPageCursorTracerSupplier.INSTANCE;
getPageCache( fs, maxPages, pageCachePageSize, tracer, cursorTracerSupplier );


long countedPages = 0; long countedPages = 0;
long countedFaults = 0; long countedFaults = 0;
Expand Down Expand Up @@ -2145,6 +2146,8 @@ public void tracerMustBeNotifiedAboutPinUnpinFaultAndEvictEventsWhenReading() th
} }
} }


cursorTracerSupplier.get().reportEvents();

assertThat( "wrong count of pins", tracer.pins(), is( countedPages ) ); assertThat( "wrong count of pins", tracer.pins(), is( countedPages ) );
assertThat( "wrong count of unpins", tracer.unpins(), is( countedPages ) ); assertThat( "wrong count of unpins", tracer.unpins(), is( countedPages ) );


Expand Down Expand Up @@ -2172,7 +2175,8 @@ public void tracerMustBeNotifiedAboutPinUnpinFaultFlushAndEvictionEventsWhenWrit
long pagesToGenerate = 142; long pagesToGenerate = 142;
DefaultPageCacheTracer tracer = new DefaultPageCacheTracer(); DefaultPageCacheTracer tracer = new DefaultPageCacheTracer();


getPageCache( fs, maxPages, pageCachePageSize, tracer, DefaultPageCursorTracerSupplier.INSTANCE ); DefaultPageCursorTracerSupplier tracerSupplier = DefaultPageCursorTracerSupplier.INSTANCE;
getPageCache( fs, maxPages, pageCachePageSize, tracer, tracerSupplier );


try ( PagedFile pagedFile = pageCache.map( file( "a" ), filePageSize ); try ( PagedFile pagedFile = pageCache.map( file( "a" ), filePageSize );
PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) ) PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
Expand All @@ -2191,6 +2195,7 @@ public void tracerMustBeNotifiedAboutPinUnpinFaultFlushAndEvictionEventsWhenWrit
assertTrue( cursor.next( 0 ) ); assertTrue( cursor.next( 0 ) );
assertTrue( cursor.next( 0 ) ); assertTrue( cursor.next( 0 ) );
} }
tracerSupplier.get().reportEvents();


assertThat( "wrong count of pins", tracer.pins(), is( pagesToGenerate + 1 ) ); assertThat( "wrong count of pins", tracer.pins(), is( pagesToGenerate + 1 ) );
assertThat( "wrong count of unpins", tracer.unpins(), is( pagesToGenerate + 1 ) ); assertThat( "wrong count of unpins", tracer.unpins(), is( pagesToGenerate + 1 ) );
Expand Down
Expand Up @@ -103,6 +103,7 @@ public void mustEvictCleanPageWithoutFlushing() throws Exception
{ {
assertTrue( cursor.next() ); assertTrue( cursor.next() );
} }
cursorTracer.reportEvents();
assertNotNull( cursorTracer.observe( Fault.class ) ); assertNotNull( cursorTracer.observe( Fault.class ) );
assertEquals( 1, cursorTracer.faults() ); assertEquals( 1, cursorTracer.faults() );
assertEquals( 1, tracer.faults() ); assertEquals( 1, tracer.faults() );
Expand Down Expand Up @@ -140,6 +141,7 @@ public void mustFlushDirtyPagesOnEvictingFirstPage() throws Exception
assertTrue( cursor.next() ); assertTrue( cursor.next() );
cursor.putLong( 0L ); cursor.putLong( 0L );
} }
cursorTracer.reportEvents();
assertNotNull( cursorTracer.observe( Fault.class ) ); assertNotNull( cursorTracer.observe( Fault.class ) );
assertEquals( 1, cursorTracer.faults() ); assertEquals( 1, cursorTracer.faults() );
assertEquals( 1, tracer.faults() ); assertEquals( 1, tracer.faults() );
Expand Down Expand Up @@ -173,6 +175,7 @@ public void mustFlushDirtyPagesOnEvictingLastPage() throws Exception
assertTrue( cursor.next() ); assertTrue( cursor.next() );
cursor.putLong( 0L ); cursor.putLong( 0L );
} }
cursorTracer.reportEvents();
assertNotNull( cursorTracer.observe( Fault.class ) ); assertNotNull( cursorTracer.observe( Fault.class ) );
assertEquals( 1, cursorTracer.faults() ); assertEquals( 1, cursorTracer.faults() );
assertEquals( 1, tracer.faults() ); assertEquals( 1, tracer.faults() );
Expand Down Expand Up @@ -209,6 +212,7 @@ public void mustFlushDirtyPagesOnEvictingAllPages() throws Exception
cursor.putLong( 0L ); cursor.putLong( 0L );
assertFalse( cursor.next() ); assertFalse( cursor.next() );
} }
cursorTracer.reportEvents();
assertNotNull( cursorTracer.observe( Fault.class ) ); assertNotNull( cursorTracer.observe( Fault.class ) );
assertNotNull( cursorTracer.observe( Fault.class ) ); assertNotNull( cursorTracer.observe( Fault.class ) );
assertEquals( 2, cursorTracer.faults() ); assertEquals( 2, cursorTracer.faults() );
Expand Down
Expand Up @@ -24,7 +24,7 @@


import org.neo4j.graphdb.NotInTransactionException; import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.TransactionTerminatedException; import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.io.pagecache.tracing.cursor.PageCursorCounters; import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer;
import org.neo4j.kernel.api.DataWriteOperations; import org.neo4j.kernel.api.DataWriteOperations;
import org.neo4j.kernel.api.query.ExecutingQuery; import org.neo4j.kernel.api.query.ExecutingQuery;
import org.neo4j.kernel.api.ProcedureCallOperations; import org.neo4j.kernel.api.ProcedureCallOperations;
Expand Down Expand Up @@ -53,14 +53,14 @@
* <ol> * <ol>
* <li>Construct {@link KernelStatement} when {@link KernelTransactionImplementation} is constructed</li> * <li>Construct {@link KernelStatement} when {@link KernelTransactionImplementation} is constructed</li>
* <li>For every transaction...</li> * <li>For every transaction...</li>
* <li>Call {@link #initialize(StatementLocks, StatementOperationParts, PageCursorCounters)} which makes this instance * <li>Call {@link #initialize(StatementLocks, StatementOperationParts, PageCursorTracer)} which makes this instance
* full available and ready to use. Call when the {@link KernelTransactionImplementation} is initialized.</li> * full available and ready to use. Call when the {@link KernelTransactionImplementation} is initialized.</li>
* <li>Alternate {@link #acquire()} / {@link #close()} when acquiring / closing a statement for the transaction... * <li>Alternate {@link #acquire()} / {@link #close()} when acquiring / closing a statement for the transaction...
* Temporarily asymmetric number of calls to {@link #acquire()} / {@link #close()} is supported, although in * Temporarily asymmetric number of calls to {@link #acquire()} / {@link #close()} is supported, although in
* the end an equal number of calls must have been issued.</li> * the end an equal number of calls must have been issued.</li>
* <li>To be safe call {@link #forceClose()} at the end of a transaction to force a close of the statement, * <li>To be safe call {@link #forceClose()} at the end of a transaction to force a close of the statement,
* even if there are more than one current call to {@link #acquire()}. This instance is now again ready * even if there are more than one current call to {@link #acquire()}. This instance is now again ready
* to be {@link #initialize(StatementLocks, StatementOperationParts, PageCursorCounters)} initialized} and used for the transaction * to be {@link #initialize(StatementLocks, StatementOperationParts, PageCursorTracer)} initialized} and used for the transaction
* instance again, when it's initialized.</li> * instance again, when it's initialized.</li>
* </ol> * </ol>
*/ */
Expand All @@ -72,7 +72,7 @@ public class KernelStatement implements TxStateHolder, Statement
private final KernelTransactionImplementation transaction; private final KernelTransactionImplementation transaction;
private final OperationsFacade facade; private final OperationsFacade facade;
private StatementLocks statementLocks; private StatementLocks statementLocks;
private PageCursorCounters pageCursorCounters; private PageCursorTracer pageCursorTracer = PageCursorTracer.NULL;
private int referenceCount; private int referenceCount;
private volatile ExecutingQueryList executingQueryList; private volatile ExecutingQueryList executingQueryList;
private final LockTracer systemLockTracer; private final LockTracer systemLockTracer;
Expand Down Expand Up @@ -186,10 +186,10 @@ void assertOpen()
} }


public void initialize( StatementLocks statementLocks, StatementOperationParts operationParts, public void initialize( StatementLocks statementLocks, StatementOperationParts operationParts,
PageCursorCounters pageCursorCounters ) PageCursorTracer pageCursorCounters )
{ {
this.statementLocks = statementLocks; this.statementLocks = statementLocks;
this.pageCursorCounters = pageCursorCounters; this.pageCursorTracer = pageCursorCounters;
facade.initialize( operationParts ); facade.initialize( operationParts );
} }


Expand All @@ -204,9 +204,9 @@ public LockTracer lockTracer()
return tracer == null ? systemLockTracer : systemLockTracer.combine( tracer ); return tracer == null ? systemLockTracer : systemLockTracer.combine( tracer );
} }


public PageCursorCounters getPageCursorCounters() public PageCursorTracer getPageCursorTracer()
{ {
return pageCursorCounters; return pageCursorTracer;
} }


public final void acquire() public final void acquire()
Expand All @@ -229,6 +229,7 @@ final void forceClose()
referenceCount = 0; referenceCount = 0;
cleanupResources(); cleanupResources();
} }
pageCursorTracer.reportEvents();
} }


final String username() final String username()
Expand Down
Expand Up @@ -445,7 +445,6 @@ public long closeTransaction() throws TransactionFailureException
{ {
assertTransactionOpen(); assertTransactionOpen();
assertTransactionNotClosing(); assertTransactionNotClosing();
closeCurrentStatementIfAny();
closing = true; closing = true;
try try
{ {
Expand Down
Expand Up @@ -49,7 +49,8 @@
import org.neo4j.test.rule.RepeatRule; import org.neo4j.test.rule.RepeatRule;
import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.TestDirectory;


import static org.junit.Assert.assertEquals; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;


public class PageCacheCountersIT public class PageCacheCountersIT
{ {
Expand Down Expand Up @@ -92,23 +93,23 @@ public void pageCacheCountersAreSumOfPageCursorCounters() throws Exception
long initialFlushes = pageCacheTracer.flushes(); long initialFlushes = pageCacheTracer.flushes();


startNodeCreators( nodeCreators, nodeCreatorFutures ); startNodeCreators( nodeCreators, nodeCreatorFutures );
TimeUnit.MILLISECONDS.sleep( 250 ); TimeUnit.MILLISECONDS.sleep( 50 );
stopNodeCreators( nodeCreators, nodeCreatorFutures ); stopNodeCreators( nodeCreators, nodeCreatorFutures );


assertEquals( "Number of pins events in page cache tracer should equal to the sum of pin events in page " + assertThat( "Number of pins events in page cache tracer should equal to the sum of pin events in page cursor tracers.",
"cursor tracers.", pageCacheTracer.pins(), sumCounters( nodeCreators, NodeCreator::getPins, initialPins ) ); pageCacheTracer.pins(), greaterThanOrEqualTo( sumCounters( nodeCreators, NodeCreator::getPins, initialPins ) ) );
assertEquals( "Number of unpins events in page cache tracer should equal to the sum of unpin events in page " + assertThat( "Number of unpins events in page cache tracer should equal to the sum of unpin events in page cursor tracers.",
"cursor tracers.", pageCacheTracer.unpins(), sumCounters( nodeCreators, NodeCreator::getUnpins, initialUnpins ) ); pageCacheTracer.unpins(), greaterThanOrEqualTo( sumCounters( nodeCreators, NodeCreator::getUnpins, initialUnpins ) ) );
assertEquals( "Number of initialBytesRead in page cache tracer should equal to the sum of initialBytesRead in page " + assertThat( "Number of initialBytesRead in page cache tracer should equal to the sum of initialBytesRead in page cursor tracers.",
"cursor tracers.", pageCacheTracer.bytesRead(), sumCounters( nodeCreators, NodeCreator::getBytesRead, initialBytesRead ) ); pageCacheTracer.bytesRead(), greaterThanOrEqualTo( sumCounters( nodeCreators, NodeCreator::getBytesRead, initialBytesRead ) ) );
assertEquals( "Number of bytesWritten in page cache tracer should equal to the sum of bytesWritten in " + assertThat( "Number of bytesWritten in page cache tracer should equal to the sum of bytesWritten in page cursor tracers.",
"page cursor tracers.", pageCacheTracer.bytesWritten(), sumCounters( nodeCreators, NodeCreator::getBytesWritten, initialBytesWritten ) ); pageCacheTracer.bytesWritten(), greaterThanOrEqualTo( sumCounters( nodeCreators, NodeCreator::getBytesWritten, initialBytesWritten ) ) );
assertEquals( "Number of evictions in page cache tracer should equal to the sum of evictions in " + assertThat( "Number of evictions in page cache tracer should equal to the sum of evictions in page cursor tracers.",
"page cursor tracers.", pageCacheTracer.evictions(), sumCounters( nodeCreators, NodeCreator::getEvictions, initialEvictions ) ); pageCacheTracer.evictions(), greaterThanOrEqualTo( sumCounters( nodeCreators, NodeCreator::getEvictions, initialEvictions ) ) );
assertEquals( "Number of faults in page cache tracer should equal to the sum of faults in " + "page cursor tracers.", assertThat( "Number of faults in page cache tracer should equal to the sum of faults in " + "page cursor tracers.",
pageCacheTracer.faults(), sumCounters( nodeCreators, NodeCreator::getFaults, initialFaults ) ); pageCacheTracer.faults(), greaterThanOrEqualTo( sumCounters( nodeCreators, NodeCreator::getFaults, initialFaults ) ) );
assertEquals( "Number of flushes in page cache tracer should equal to the sum of flushes in " + assertThat( "Number of flushes in page cache tracer should equal to the sum of flushes in page cursor tracers.",
"page cursor tracers.", pageCacheTracer.flushes(), sumCounters( nodeCreators, NodeCreator::getFlushes, initialFlushes ) ); pageCacheTracer.flushes(), greaterThanOrEqualTo( sumCounters( nodeCreators, NodeCreator::getFlushes, initialFlushes ) ) );
} }


private void stopNodeCreators( List<NodeCreator> nodeCreators, List<Future> nodeCreatorFutures ) private void stopNodeCreators( List<NodeCreator> nodeCreators, List<Future> nodeCreatorFutures )
Expand All @@ -133,7 +134,7 @@ private void startNodeCreators( List<NodeCreator> nodeCreators, List<Future> nod


private long sumCounters( List<NodeCreator> nodeCreators, ToLongFunction<NodeCreator> mapper, long initialValue ) private long sumCounters( List<NodeCreator> nodeCreators, ToLongFunction<NodeCreator> mapper, long initialValue )
{ {
return nodeCreators.stream().mapToLong( mapper ).reduce( initialValue, Long::sum ); return nodeCreators.stream().mapToLong( mapper ).sum() + initialValue;
} }


private PageCacheTracer getPageCacheTracer( GraphDatabaseService db ) private PageCacheTracer getPageCacheTracer( GraphDatabaseService db )
Expand Down Expand Up @@ -169,27 +170,27 @@ public void run()
try ( Transaction transaction = db.beginTx() ) try ( Transaction transaction = db.beginTx() )
{ {
KernelStatement kernelStatement = getKernelStatement( (GraphDatabaseAPI) db ); KernelStatement kernelStatement = getKernelStatement( (GraphDatabaseAPI) db );
pageCursorCounters = kernelStatement.getPageCursorCounters(); pageCursorCounters = kernelStatement.getPageCursorTracer();
Node node = db.createNode(); Node node = db.createNode();
node.setProperty( "name", RandomStringUtils.random( localRandom.nextInt( 100 ) ) ); node.setProperty( "name", RandomStringUtils.random( localRandom.nextInt( 100 ) ) );
node.setProperty( "surname", RandomStringUtils.random( localRandom.nextInt( 100 ) ) ); node.setProperty( "surname", RandomStringUtils.random( localRandom.nextInt( 100 ) ) );
node.setProperty( "age", localRandom.nextInt( 100 ) ); node.setProperty( "age", localRandom.nextInt( 100 ) );
transaction.success(); transaction.success();
storeCounters( pageCursorCounters );
} }
storeCounters( pageCursorCounters );
} }
} }


private void storeCounters( PageCursorCounters pageCursorCounters ) private void storeCounters( PageCursorCounters pageCursorCounters )
{ {
Objects.nonNull( pageCursorCounters ); Objects.nonNull( pageCursorCounters );
pins = pageCursorCounters.pins(); pins +=pageCursorCounters.pins();
unpins = pageCursorCounters.unpins(); unpins += pageCursorCounters.unpins();
bytesRead = pageCursorCounters.bytesRead(); bytesRead += pageCursorCounters.bytesRead();
bytesWritten = pageCursorCounters.bytesWritten(); bytesWritten += pageCursorCounters.bytesWritten();
evictions = pageCursorCounters.evictions(); evictions += pageCursorCounters.evictions();
faults = pageCursorCounters.faults(); faults += pageCursorCounters.faults();
flushes = pageCursorCounters.flushes(); flushes += pageCursorCounters.flushes();
} }


@Override @Override
Expand Down

0 comments on commit a26e1ba

Please sign in to comment.