Skip to content

Commit

Permalink
Global and database local transaction statistics.
Browse files Browse the repository at this point in the history
Introduce possibility to track transaction statistic per database, while
keeping possibility to get aggregated number for whole server.

Change local database counters from using AtomicLong to LongAdder.
  • Loading branch information
MishaDemianenko committed Aug 14, 2018
1 parent de1a981 commit 438cb96
Show file tree
Hide file tree
Showing 26 changed files with 161 additions and 134 deletions.
Expand Up @@ -35,7 +35,7 @@ import org.neo4j.kernel.impl.coreapi.{InternalTransaction, PropertyContainerLock
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade import org.neo4j.kernel.impl.factory.GraphDatabaseFacade
import org.neo4j.kernel.impl.query._ import org.neo4j.kernel.impl.query._
import org.neo4j.kernel.impl.query.clientconnection.ClientConnectionInfo import org.neo4j.kernel.impl.query.clientconnection.ClientConnectionInfo
import org.neo4j.kernel.impl.transaction.TransactionStats import org.neo4j.kernel.impl.transaction.stats.DatabaseTransactionStats
import org.neo4j.kernel.impl.util.ValueUtils.asMapValue import org.neo4j.kernel.impl.util.ValueUtils.asMapValue


import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
Expand Down Expand Up @@ -151,7 +151,7 @@ trait GraphIcing {


def txCounts = TxCounts(txMonitor.getNumberOfCommittedTransactions, txMonitor.getNumberOfRolledBackTransactions, txMonitor.getNumberOfActiveTransactions) def txCounts = TxCounts(txMonitor.getNumberOfCommittedTransactions, txMonitor.getNumberOfRolledBackTransactions, txMonitor.getNumberOfActiveTransactions)


private def txMonitor: TransactionStats = graph.getDependencyResolver.resolveDependency(classOf[TransactionStats]) private def txMonitor: DatabaseTransactionStats = graph.getDependencyResolver.resolveDependency(classOf[DatabaseTransactionStats])


private def txBridge: ThreadToStatementContextBridge = graph.getDependencyResolver.resolveDependency(classOf[ThreadToStatementContextBridge]) private def txBridge: ThreadToStatementContextBridge = graph.getDependencyResolver.resolveDependency(classOf[ThreadToStatementContextBridge])
} }
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;


import java.io.File;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;


Expand All @@ -33,9 +32,9 @@
import org.neo4j.graphdb.facade.GraphDatabaseFacadeFactory; import org.neo4j.graphdb.facade.GraphDatabaseFacadeFactory;
import org.neo4j.graphdb.factory.GraphDatabaseFactoryState; import org.neo4j.graphdb.factory.GraphDatabaseFactoryState;
import org.neo4j.graphdb.factory.module.CommunityEditionModule; import org.neo4j.graphdb.factory.module.CommunityEditionModule;
import org.neo4j.graphdb.factory.module.PlatformModule;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.DatabaseInfo; import org.neo4j.kernel.impl.factory.DatabaseInfo;
import org.neo4j.kernel.impl.transaction.stats.DatabaseTransactionStats;
import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.TestDirectory;


import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -118,45 +117,14 @@ private Thread startFirstTransactionWhichBlocksDuringPushUntilSecondTransactionF
private GraphDatabaseService createDb() private GraphDatabaseService createDb()
{ {
GraphDatabaseFactoryState state = new GraphDatabaseFactoryState(); GraphDatabaseFactoryState state = new GraphDatabaseFactoryState();
//noinspection deprecation return new GraphDatabaseFacadeFactory( DatabaseInfo.COMMUNITY, platformModule -> new CommunityEditionModule( platformModule )
return new GraphDatabaseFacadeFactory( DatabaseInfo.COMMUNITY, CommunityEditionModule::new )
{ {
@Override @Override
protected PlatformModule createPlatform( File storeDir, Config config, Dependencies dependencies ) public DatabaseTransactionStats createTransactionMonitor()
{ {
return new PlatformModule( storeDir, config, databaseInfo, dependencies ) return new SkipTransactionDatabaseStats();
{
@Override
protected TransactionStats createTransactionStats()
{
return new TransactionStats()
{
public boolean skip;

@Override
public void transactionFinished( boolean committed, boolean write )
{
super.transactionFinished( committed, write );

if ( committed )
{
// skip signal and waiting for second transaction
if ( skip )
{
return;
}
skip = true;

signalFirstTransactionStartedPushing();

waitForSecondTransactionToFinish();
}
}
};
}
};
} }
}.newFacade( storeLocation.storeDir(), Config.defaults(), state.databaseDependencies() ); } ).newFacade( storeLocation.storeDir(), Config.defaults(), state.databaseDependencies() );
} }


private void waitForFirstTransactionToStartPushing() throws InterruptedException private void waitForFirstTransactionToStartPushing() throws InterruptedException
Expand Down Expand Up @@ -193,4 +161,29 @@ private void waitForSecondTransactionToFinish()
reference.set( e ); reference.set( e );
} }
} }

private class SkipTransactionDatabaseStats extends DatabaseTransactionStats
{
boolean skip;

@Override
public void transactionFinished( boolean committed, boolean write )
{
super.transactionFinished( committed, write );

if ( committed )
{
// skip signal and waiting for second transaction
if ( skip )
{
return;
}
skip = true;

signalFirstTransactionStartedPushing();

waitForSecondTransactionToFinish();
}
}
}
} }
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.function.ThrowingConsumer; import org.neo4j.function.ThrowingConsumer;
import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.impl.transaction.stats.TransactionCounters;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.test.TestGraphDatabaseFactory; import org.neo4j.test.TestGraphDatabaseFactory;


Expand Down
Expand Up @@ -21,9 +21,8 @@


import java.time.Clock; import java.time.Clock;


import org.neo4j.kernel.impl.transaction.TransactionStats; import org.neo4j.kernel.impl.transaction.stats.TransactionCounters;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.time.Clocks;


import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.locks.LockSupport.parkNanos; import static java.util.concurrent.locks.LockSupport.parkNanos;
Expand All @@ -39,16 +38,18 @@ public class DatabaseAvailability extends LifecycleAdapter
{ {
private static final AvailabilityRequirement AVAILABILITY_REQUIREMENT = availabilityRequirement( "Database available" ); private static final AvailabilityRequirement AVAILABILITY_REQUIREMENT = availabilityRequirement( "Database available" );
private final AvailabilityGuard availabilityGuard; private final AvailabilityGuard availabilityGuard;
private final TransactionStats transactionMonitor; private final TransactionCounters transactionCounters;
private final Clock clock;
private final long awaitActiveTransactionDeadlineMillis; private final long awaitActiveTransactionDeadlineMillis;
private volatile boolean started; private volatile boolean started;


public DatabaseAvailability( AvailabilityGuard availabilityGuard, TransactionStats transactionMonitor, public DatabaseAvailability( AvailabilityGuard availabilityGuard, TransactionCounters transactionCounters, Clock clock,
long awaitActiveTransactionDeadlineMillis ) long awaitActiveTransactionDeadlineMillis )
{ {
this.availabilityGuard = availabilityGuard; this.availabilityGuard = availabilityGuard;
this.transactionMonitor = transactionMonitor; this.transactionCounters = transactionCounters;
this.awaitActiveTransactionDeadlineMillis = awaitActiveTransactionDeadlineMillis; this.awaitActiveTransactionDeadlineMillis = awaitActiveTransactionDeadlineMillis;
this.clock = clock;


// On initial setup, deny availability // On initial setup, deny availability
availabilityGuard.require( AVAILABILITY_REQUIREMENT ); availabilityGuard.require( AVAILABILITY_REQUIREMENT );
Expand Down Expand Up @@ -80,9 +81,8 @@ public boolean isStarted()


private void awaitTransactionsClosedWithinTimeout() private void awaitTransactionsClosedWithinTimeout()
{ {
Clock clock = Clocks.systemClock();
long deadline = clock.millis() + awaitActiveTransactionDeadlineMillis; long deadline = clock.millis() + awaitActiveTransactionDeadlineMillis;
while ( transactionMonitor.getNumberOfActiveTransactions() > 0 && clock.millis() < deadline ) while ( transactionCounters.getNumberOfActiveTransactions() > 0 && clock.millis() < deadline )
{ {
parkNanos( MILLISECONDS.toNanos( 10 ) ); parkNanos( MILLISECONDS.toNanos( 10 ) );
} }
Expand Down
Expand Up @@ -21,8 +21,6 @@


/** /**
* This monitor is used to track transaction management. It is called when client code starts, finishes, or terminates transactions. * This monitor is used to track transaction management. It is called when client code starts, finishes, or terminates transactions.
*
* In HA this refers only to what the local instance is doing, not what is happening globally in the whole cluster.
*/ */
public interface TransactionMonitor public interface TransactionMonitor
{ {
Expand Down
Expand Up @@ -17,27 +17,30 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.kernel.impl.transaction; package org.neo4j.kernel.impl.transaction.stats;


import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;


public class TransactionStats implements TransactionMonitor, TransactionCounters import org.neo4j.kernel.impl.transaction.TransactionMonitor;

public class DatabaseTransactionStats implements TransactionMonitor, TransactionCounters
{ {
private final AtomicLong startedTransactionCount = new AtomicLong();
private final AtomicLong activeReadTransactionCount = new AtomicLong(); private final AtomicLong activeReadTransactionCount = new AtomicLong();
private final AtomicLong activeWriteTransactionCount = new AtomicLong(); private final LongAdder startedTransactionCount = new LongAdder();
private final AtomicLong committedReadTransactionCount = new AtomicLong(); private final LongAdder activeWriteTransactionCount = new LongAdder();
private final AtomicLong committedWriteTransactionCount = new AtomicLong(); private final LongAdder committedReadTransactionCount = new LongAdder();
private final AtomicLong rolledBackReadTransactionCount = new AtomicLong(); private final LongAdder committedWriteTransactionCount = new LongAdder();
private final AtomicLong rolledBackWriteTransactionCount = new AtomicLong(); private final LongAdder rolledBackReadTransactionCount = new LongAdder();
private final AtomicLong terminatedReadTransactionCount = new AtomicLong(); private final LongAdder rolledBackWriteTransactionCount = new LongAdder();
private final AtomicLong terminatedWriteTransactionCount = new AtomicLong(); private final LongAdder terminatedReadTransactionCount = new LongAdder();
private final LongAdder terminatedWriteTransactionCount = new LongAdder();
private volatile long peakTransactionCount; private volatile long peakTransactionCount;


@Override @Override
public void transactionStarted() public void transactionStarted()
{ {
startedTransactionCount.incrementAndGet(); startedTransactionCount.increment();
long active = activeReadTransactionCount.incrementAndGet(); long active = activeReadTransactionCount.incrementAndGet();
peakTransactionCount = Math.max( peakTransactionCount, active ); peakTransactionCount = Math.max( peakTransactionCount, active );
} }
Expand Down Expand Up @@ -66,7 +69,7 @@ public void transactionTerminated( boolean write )
public void upgradeToWriteTransaction() public void upgradeToWriteTransaction()
{ {
decrementCounter( activeReadTransactionCount, activeWriteTransactionCount, false ); decrementCounter( activeReadTransactionCount, activeWriteTransactionCount, false );
incrementCounter( activeReadTransactionCount, activeWriteTransactionCount, true ); activeWriteTransactionCount.increment();
} }


@Override @Override
Expand All @@ -78,7 +81,7 @@ public long getPeakConcurrentNumberOfTransactions()
@Override @Override
public long getNumberOfStartedTransactions() public long getNumberOfStartedTransactions()
{ {
return startedTransactionCount.get(); return startedTransactionCount.longValue();
} }


@Override @Override
Expand All @@ -90,13 +93,13 @@ public long getNumberOfCommittedTransactions()
@Override @Override
public long getNumberOfCommittedReadTransactions() public long getNumberOfCommittedReadTransactions()
{ {
return committedReadTransactionCount.get(); return committedReadTransactionCount.longValue();
} }


@Override @Override
public long getNumberOfCommittedWriteTransactions() public long getNumberOfCommittedWriteTransactions()
{ {
return committedWriteTransactionCount.get(); return committedWriteTransactionCount.longValue();
} }


@Override @Override
Expand All @@ -108,13 +111,13 @@ public long getNumberOfActiveTransactions()
@Override @Override
public long getNumberOfActiveReadTransactions() public long getNumberOfActiveReadTransactions()
{ {
return activeReadTransactionCount.get(); return activeReadTransactionCount.longValue();
} }


@Override @Override
public long getNumberOfActiveWriteTransactions() public long getNumberOfActiveWriteTransactions()
{ {
return activeWriteTransactionCount.get(); return activeWriteTransactionCount.longValue();
} }


@Override @Override
Expand All @@ -126,13 +129,13 @@ public long getNumberOfTerminatedTransactions()
@Override @Override
public long getNumberOfTerminatedReadTransactions() public long getNumberOfTerminatedReadTransactions()
{ {
return terminatedReadTransactionCount.get(); return terminatedReadTransactionCount.longValue();
} }


@Override @Override
public long getNumberOfTerminatedWriteTransactions() public long getNumberOfTerminatedWriteTransactions()
{ {
return terminatedWriteTransactionCount.get(); return terminatedWriteTransactionCount.longValue();
} }


@Override @Override
Expand All @@ -144,24 +147,36 @@ public long getNumberOfRolledBackTransactions()
@Override @Override
public long getNumberOfRolledBackReadTransactions() public long getNumberOfRolledBackReadTransactions()
{ {
return rolledBackReadTransactionCount.get(); return rolledBackReadTransactionCount.longValue();
} }


@Override @Override
public long getNumberOfRolledBackWriteTransactions() public long getNumberOfRolledBackWriteTransactions()
{ {
return rolledBackWriteTransactionCount.get(); return rolledBackWriteTransactionCount.longValue();
} }


private void incrementCounter( AtomicLong readCount, AtomicLong writeCount, boolean write ) private static void incrementCounter( LongAdder readCount, LongAdder writeCount, boolean write )
{ {
long count = write ? writeCount.incrementAndGet() : readCount.incrementAndGet(); if ( write )
assert count > 0; {
writeCount.increment();
}
else
{
readCount.increment();
}
} }


private void decrementCounter( AtomicLong readCount, AtomicLong writeCount, boolean write ) private static void decrementCounter( AtomicLong readCount, LongAdder writeCount, boolean write )
{ {
long count = write ? writeCount.decrementAndGet() : readCount.decrementAndGet(); if ( write )
assert count >= 0; {
writeCount.decrement();
}
else
{
readCount.decrementAndGet();
}
} }
} }
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.kernel.impl.transaction; package org.neo4j.kernel.impl.transaction.stats;


public interface TransactionCounters public interface TransactionCounters
{ {
Expand Down
Expand Up @@ -19,6 +19,8 @@
*/ */
package org.neo4j.kernel.impl.transaction; package org.neo4j.kernel.impl.transaction;


import org.neo4j.kernel.impl.transaction.stats.TransactionCounters;

import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;


class TransactionCountersChecker class TransactionCountersChecker
Expand Down
Expand Up @@ -73,9 +73,9 @@
import org.neo4j.kernel.impl.store.id.configuration.IdTypeConfigurationProvider; import org.neo4j.kernel.impl.store.id.configuration.IdTypeConfigurationProvider;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor; import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.TransactionStats;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex; import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.impl.transaction.log.files.LogFileCreationMonitor; import org.neo4j.kernel.impl.transaction.log.files.LogFileCreationMonitor;
import org.neo4j.kernel.impl.transaction.stats.DatabaseTransactionStats;
import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.impl.util.UnsatisfiedDependencyException; import org.neo4j.kernel.impl.util.UnsatisfiedDependencyException;
import org.neo4j.kernel.impl.util.collection.CollectionsFactorySupplier; import org.neo4j.kernel.impl.util.collection.CollectionsFactorySupplier;
Expand Down Expand Up @@ -135,7 +135,7 @@ public NeoStoreDataSource getDataSource( File storeDir, FileSystemAbstraction fs
deps -> new DatabaseHealth( mock( DatabasePanicEventGenerator.class ), NullLog.getInstance() ) ); deps -> new DatabaseHealth( mock( DatabasePanicEventGenerator.class ), NullLog.getInstance() ) );
SystemNanoClock clock = dependency( mutableDependencies, SystemNanoClock.class, deps -> Clocks.nanoClock() ); SystemNanoClock clock = dependency( mutableDependencies, SystemNanoClock.class, deps -> Clocks.nanoClock() );
TransactionMonitor transactionMonitor = dependency( mutableDependencies, TransactionMonitor.class, TransactionMonitor transactionMonitor = dependency( mutableDependencies, TransactionMonitor.class,
deps -> new TransactionStats() ); deps -> new DatabaseTransactionStats() );
AvailabilityGuard availabilityGuard = dependency( mutableDependencies, AvailabilityGuard.class, AvailabilityGuard availabilityGuard = dependency( mutableDependencies, AvailabilityGuard.class,
deps -> new AvailabilityGuard( deps.resolveDependency( SystemNanoClock.class ), NullLog.getInstance() ) ); deps -> new AvailabilityGuard( deps.resolveDependency( SystemNanoClock.class ), NullLog.getInstance() ) );
dependency( mutableDependencies, DiagnosticsManager.class, dependency( mutableDependencies, DiagnosticsManager.class,
Expand Down
Expand Up @@ -177,8 +177,9 @@ public GraphDatabaseFacade initFacade( File storeDir, Config config, final Depen
platform.life.add( platform.globalKernelExtensions ); platform.life.add( platform.globalKernelExtensions );
platform.life.add( createBoltServer( platform, edition, databaseManager ) ); platform.life.add( createBoltServer( platform, edition, databaseManager ) );
platform.life.add( new VmPauseMonitorComponent( config, platform.logging.getInternalLog( VmPauseMonitorComponent.class ), platform.jobScheduler ) ); platform.life.add( new VmPauseMonitorComponent( config, platform.logging.getInternalLog( VmPauseMonitorComponent.class ), platform.jobScheduler ) );
platform.dependencies.satisfyDependency( edition.globalTransactionCounter() );
platform.life.add( new PublishPageCacheTracerMetricsAfterStart( platform.tracers.pageCursorTracerSupplier ) ); platform.life.add( new PublishPageCacheTracerMetricsAfterStart( platform.tracers.pageCursorTracerSupplier ) );
DatabaseAvailability databaseAvailability = new DatabaseAvailability( platform.availabilityGuard, platform.transactionMonitor, DatabaseAvailability databaseAvailability = new DatabaseAvailability( platform.availabilityGuard, edition.globalTransactionCounter(), platform.clock,
config.get( GraphDatabaseSettings.shutdown_transaction_end_timeout ).toMillis() ); config.get( GraphDatabaseSettings.shutdown_transaction_end_timeout ).toMillis() );
platform.dependencies.satisfyDependency( databaseAvailability ); platform.dependencies.satisfyDependency( databaseAvailability );
platform.life.add( databaseAvailability ); platform.life.add( databaseAvailability );
Expand Down

0 comments on commit 438cb96

Please sign in to comment.