Skip to content

Commit

Permalink
Add ability to switch between on- and off-heap tx state allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Koval committed Mar 6, 2018
1 parent 74cb2da commit 8028ef3
Show file tree
Hide file tree
Showing 28 changed files with 760 additions and 131 deletions.
Expand Up @@ -25,6 +25,8 @@
*/
public interface MemoryTracker
{
MemoryTracker NONE = () -> 0;

/**
* @return number of bytes of direct memory that are used
*/
Expand Down
Expand Up @@ -839,6 +839,19 @@ public enum LabelIndex
public static final Setting<Duration> bookmark_ready_timeout = buildSetting(
"dbms.transaction.bookmark_ready_timeout", DURATION, "30s" ).constraint( min( Duration.ofSeconds( 1 ) ) ).build();

public enum TransactionStateMemoryAllocation
{
ON_HEAP,
OFF_HEAP
}

@Internal
@Description( "[Experimental] Defines whether memory for transaction state should allocaten on- or off-heap." )
public static final Setting<TransactionStateMemoryAllocation> tx_state_memory_allocation = buildSetting(
"unsupported.dbms.tx_state.memory_allocation",
options( TransactionStateMemoryAllocation.class, true ),
TransactionStateMemoryAllocation.ON_HEAP.name() ).build();

// Needed to validate config, accessed via reflection
@SuppressWarnings( "unused" )
public static final HttpConnectorValidator httpValidator = new HttpConnectorValidator();
Expand Down
Expand Up @@ -131,6 +131,7 @@
import org.neo4j.kernel.impl.transaction.state.NeoStoreFileListing;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.impl.util.SynchronizedArrayIdOrderingQueue;
import org.neo4j.kernel.impl.util.collection.CollectionsFactorySupplier;
import org.neo4j.kernel.impl.util.monitoring.LogProgressReporter;
import org.neo4j.kernel.impl.util.monitoring.ProgressReporter;
import org.neo4j.kernel.info.DiagnosticsExtractor;
Expand Down Expand Up @@ -267,6 +268,7 @@ boolean applicable( DiagnosticsPhase phase )
private final AvailabilityGuard availabilityGuard;
private final SystemNanoClock clock;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final CollectionsFactorySupplier collectionsFactorySupplier;

private Dependencies dependencies;
private LifeSupport life;
Expand Down Expand Up @@ -297,7 +299,7 @@ public NeoStoreDataSource( File storeDir, Config config, IdGeneratorFactory idGe
Tracers tracers, Procedures procedures, IOLimiter ioLimiter, AvailabilityGuard availabilityGuard,
SystemNanoClock clock, AccessCapability accessCapability, StoreCopyCheckPointMutex storeCopyCheckPointMutex,
RecoveryCleanupWorkCollector recoveryCleanupWorkCollector, IdController idController,
OperationalMode operationalMode, VersionContextSupplier versionContextSupplier )
OperationalMode operationalMode, VersionContextSupplier versionContextSupplier, CollectionsFactorySupplier collectionsFactorySupplier )
{
this.storeDir = storeDir;
this.config = config;
Expand Down Expand Up @@ -365,6 +367,7 @@ public Iterable<IndexImplementation> all()
this.commitProcessFactory = commitProcessFactory;
this.pageCache = pageCache;
this.monitors.addMonitorListener( new LoggingLogFileMonitor( msgLog ) );
this.collectionsFactorySupplier = collectionsFactorySupplier;
}

@Override
Expand Down Expand Up @@ -688,7 +691,7 @@ private NeoStoreKernelModule buildKernel( LogFiles logFiles, TransactionAppender
transactionCommitProcess, indexConfigStore, explicitIndexProviderLookup, hooks, transactionMonitor,
availabilityGuard, tracers, storageEngine, procedures, transactionIdStore, clock,
cpuClockRef, heapAllocationRef, accessCapability, DefaultCursors::new, autoIndexing,
explicitIndexStore, versionContextSupplier ) );
explicitIndexStore, versionContextSupplier, collectionsFactorySupplier ) );

buildTransactionMonitor( kernelTransactions, clock, config );

Expand Down
Expand Up @@ -87,6 +87,8 @@
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.tracing.TransactionEvent;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.kernel.impl.util.collection.CollectionsFactory;
import org.neo4j.kernel.impl.util.collection.CollectionsFactorySupplier;
import org.neo4j.resources.CpuClock;
import org.neo4j.resources.HeapAllocation;
import org.neo4j.storageengine.api.StorageCommand;
Expand Down Expand Up @@ -116,6 +118,8 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta
private static final long NOT_COMMITTED_TRANSACTION_ID = -1;
private static final long NOT_COMMITTED_TRANSACTION_COMMIT_TIME = -1;

private final CollectionsFactory collectionsFactory;

// Logic
private final SchemaWriteGuard schemaWriteGuard;
private final TransactionHooks hooks;
Expand All @@ -138,7 +142,7 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta

// State that needs to be reset between uses. Most of these should be cleared or released in #release(),
// whereas others, such as timestamp or txId when transaction starts, even locks, needs to be set in #initialize().
private TransactionState txState;
private TxState txState;
private ExplicitIndexTransactionState explicitIndexTransactionState;
private TransactionWriteState writeState;
private TransactionHooks.TransactionHooksState hooksState;
Expand Down Expand Up @@ -184,7 +188,7 @@ public KernelTransactionImplementation( StatementOperationParts statementOperati
Pool<KernelTransactionImplementation> pool, Clock clock, AtomicReference<CpuClock> cpuClockRef, AtomicReference<HeapAllocation> heapAllocationRef,
TransactionTracer transactionTracer, LockTracer lockTracer, PageCursorTracerSupplier cursorTracerSupplier,
StorageEngine storageEngine, AccessCapability accessCapability, DefaultCursors cursors, AutoIndexing autoIndexing,
ExplicitIndexStore explicitIndexStore, VersionContextSupplier versionContextSupplier )
ExplicitIndexStore explicitIndexStore, VersionContextSupplier versionContextSupplier, CollectionsFactorySupplier collectionsFactorySupplier )
{
this.statementOperations = statementOperations;
this.schemaWriteGuard = schemaWriteGuard;
Expand Down Expand Up @@ -216,6 +220,7 @@ procedures, accessCapability, lockTracer, statementOperations, new ClockContext(
new IndexTxStateUpdater( storageEngine.storeReadLayer(), allStoreHolder ),
storageStatement,
this, new KernelToken( storeLayer, this ), cursors, autoIndexing );
this.collectionsFactory = collectionsFactorySupplier.create();
}

/**
Expand Down Expand Up @@ -409,7 +414,7 @@ public TransactionState txState()
if ( txState == null )
{
transactionMonitor.upgradeToWriteTransaction();
txState = new TxState();
txState = new TxState( collectionsFactory );
}
return txState;
}
Expand Down Expand Up @@ -851,7 +856,11 @@ private void release()
securityContext = null;
transactionEvent = null;
explicitIndexTransactionState = null;
txState = null;
if ( txState != null )
{
txState.release();
txState = null;
}
hooksState = null;
closeListeners.clear();
reuseCount++;
Expand Down Expand Up @@ -1005,11 +1014,21 @@ protected void init( long threadId, PageCursorTracer pageCursorTracer )
* Returns number of allocated bytes by current transaction.
* @return number of allocated bytes by the thread.
*/
long heapAllocateBytes()
long heapAllocatedBytes()
{
return heapAllocation.allocatedBytes( transactionThreadId ) - heapAllocatedBytesWhenQueryStarted;
}

/**
* Returns amount of direct memory allocated by current transaction.
*
* @return amount of direct memory allocated by the thread in bytes.
*/
long directAllocatedBytes()
{
return transaction.collectionsFactory.getMemoryTracker().usedDirectMemory();
}

/**
* Return CPU time used by current transaction in milliseconds
* @return the current CPU time used by the transaction, in milliseconds.
Expand Down
Expand Up @@ -54,6 +54,7 @@
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.MonotonicCounter;
import org.neo4j.kernel.impl.util.collection.CollectionsFactorySupplier;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.tracing.Tracers;
import org.neo4j.resources.CpuClock;
Expand Down Expand Up @@ -98,6 +99,7 @@ public class KernelTransactions extends LifecycleAdapter implements Supplier<Ker
private final Supplier<DefaultCursors> cursorsSupplier;
private final AutoIndexing autoIndexing;
private final ExplicitIndexStore explicitIndexStore;
private final CollectionsFactorySupplier collectionsFactorySupplier;

/**
* Used to enumerate all transactions in the system, active and idle ones.
Expand Down Expand Up @@ -138,7 +140,9 @@ public KernelTransactions( StatementLocksFactory statementLocksFactory,
AtomicReference<CpuClock> cpuClockRef, AtomicReference<HeapAllocation> heapAllocationRef, AccessCapability accessCapability,
Supplier<DefaultCursors> cursorsSupplier,
AutoIndexing autoIndexing,
ExplicitIndexStore explicitIndexStore, VersionContextSupplier versionContextSupplier )
ExplicitIndexStore explicitIndexStore,
VersionContextSupplier versionContextSupplier,
CollectionsFactorySupplier collectionsFactorySupplier )
{
this.statementLocksFactory = statementLocksFactory;
this.constraintIndexCreator = constraintIndexCreator;
Expand All @@ -165,6 +169,7 @@ public KernelTransactions( StatementLocksFactory statementLocksFactory,
this.clock = clock;
doBlockNewTransactions();
this.cursorsSupplier = cursorsSupplier;
this.collectionsFactorySupplier = collectionsFactorySupplier;
}

public Supplier<ExplicitIndexTransactionState> explicitIndexTxStateSupplier()
Expand Down Expand Up @@ -362,7 +367,7 @@ public KernelTransactionImplementation newInstance()
clock, cpuClockRef, heapAllocationRef, tracers.transactionTracer, tracers.lockTracer,
tracers.pageCursorTracerSupplier, storageEngine, accessCapability,
cursorsSupplier.get(), autoIndexing,
explicitIndexStore, versionContextSupplier );
explicitIndexStore, versionContextSupplier, collectionsFactorySupplier );
this.transactions.add( tx );
return tx;
}
Expand Down
Expand Up @@ -27,7 +27,8 @@ public class TransactionExecutionStatistic
{
public static final TransactionExecutionStatistic NOT_AVAILABLE = new TransactionExecutionStatistic();

private final Long heapAllocateBytes;
private final Long heapAllocatedBytes;
private final Long directAllocatedBytes;
private final Long cpuTimeMillis;
private final long waitTimeMillis;
private final long elapsedTimeMillis;
Expand All @@ -37,7 +38,8 @@ public class TransactionExecutionStatistic

private TransactionExecutionStatistic()
{
heapAllocateBytes = null;
heapAllocatedBytes = null;
directAllocatedBytes = null;
cpuTimeMillis = null;
waitTimeMillis = -1;
elapsedTimeMillis = -1;
Expand All @@ -52,19 +54,23 @@ public TransactionExecutionStatistic( KernelTransactionImplementation tx, System
long nowNanos = clock.nanos();
KernelTransactionImplementation.Statistics statistics = tx.getStatistics();
this.waitTimeMillis = NANOSECONDS.toMillis( statistics.getWaitingTimeNanos( nowNanos ) );
long heapAllocateBytes = statistics.heapAllocateBytes();
this.heapAllocateBytes = heapAllocateBytes >= 0 ? heapAllocateBytes : null;
long cpuTimeMillis = statistics.cpuTimeMillis();
this.cpuTimeMillis = cpuTimeMillis >= 0 ? cpuTimeMillis : null;
this.heapAllocatedBytes = nullIfNegative( statistics.heapAllocatedBytes() );
this.directAllocatedBytes = nullIfNegative( statistics.heapAllocatedBytes() );
this.cpuTimeMillis = nullIfNegative( statistics.cpuTimeMillis() );
this.pageFaults = statistics.totalTransactionPageCacheFaults();
this.pageHits = statistics.totalTransactionPageCacheHits();
this.elapsedTimeMillis = nowMillis - startTimeMillis;
this.idleTimeMillis = this.cpuTimeMillis != null ? elapsedTimeMillis - this.cpuTimeMillis - waitTimeMillis : null;
}

public Long getHeapAllocateBytes()
public Long getHeapAllocatedBytes()
{
return heapAllocateBytes;
return heapAllocatedBytes;
}

public Long getDirectAllocatedBytes()
{
return directAllocatedBytes;
}

public Long getCpuTimeMillis()
Expand Down Expand Up @@ -96,4 +102,9 @@ public long getPageFaults()
{
return pageFaults;
}

private static Long nullIfNegative( long value )
{
return value >= 0 ? value : null;
}
}
Expand Up @@ -202,7 +202,7 @@ public ReadableDiffSets<Integer> labelDiffSets()

DiffSets<Integer> getOrCreateLabelDiffSets()
{
if ( null == labelDiffSets )
if ( labelDiffSets == null )
{
labelDiffSets = new DiffSets<>();
}
Expand Down
Expand Up @@ -22,7 +22,7 @@
import java.util.Map;

import org.neo4j.kernel.impl.util.VersionedHashMap;
import org.neo4j.kernel.impl.util.diffsets.EmptyPrimitiveLongReadableDiffSets;
import org.neo4j.kernel.impl.util.collection.CollectionsFactory;
import org.neo4j.kernel.impl.util.diffsets.PrimitiveLongDiffSets;
import org.neo4j.storageengine.api.txstate.PrimitiveLongReadableDiffSets;

Expand All @@ -31,8 +31,15 @@
*/
public class PropertyChanges
{
private final CollectionsFactory collectionsFactory;

private VersionedHashMap<Integer, Map<Object,PrimitiveLongDiffSets>> changes;

public PropertyChanges( CollectionsFactory collectionsFactory )
{
this.collectionsFactory = collectionsFactory;
}

public PrimitiveLongReadableDiffSets changesForProperty( int propertyKeyId, Object value )
{
if ( changes != null )
Expand All @@ -47,7 +54,7 @@ public PrimitiveLongReadableDiffSets changesForProperty( int propertyKeyId, Obje
}
}
}
return EmptyPrimitiveLongReadableDiffSets.INSTANCE;
return PrimitiveLongReadableDiffSets.EMPTY;
}

public void changeProperty( long entityId, int propertyKeyId, Object oldValue, Object newValue )
Expand Down Expand Up @@ -79,6 +86,14 @@ private Map<Object, PrimitiveLongDiffSets> keyChanges( int propertyKeyId )

private PrimitiveLongDiffSets valueChanges( Object newValue, Map<Object, PrimitiveLongDiffSets> keyChanges )
{
return keyChanges.computeIfAbsent( newValue, k -> new PrimitiveLongDiffSets() );
return keyChanges.computeIfAbsent( newValue, k -> collectionsFactory.newLongDiffSets() );
}

public void release()
{
for ( final Map<Object, PrimitiveLongDiffSets> map : changes.values() )
{
map.values().forEach( PrimitiveLongDiffSets::close );
}
}
}

0 comments on commit 8028ef3

Please sign in to comment.