Skip to content

Commit

Permalink
Allows import monitor to notice use of PageCache for caching
Browse files Browse the repository at this point in the history
Caching for temporary importer data structures, that is.
This allows user to become aware of the fact that an performance
of an import can degrade heavily when that happens.

Some amount of information is also provided as to why of/off-heap
allocators couldn't be used.
  • Loading branch information
tinwelint committed Jul 31, 2018
1 parent 8dee3a6 commit 8a393f2
Show file tree
Hide file tree
Showing 14 changed files with 294 additions and 45 deletions.
Expand Up @@ -61,7 +61,7 @@ public CountsComputer( NeoStores stores, PageCache pageCache )
stores.getNodeStore(), stores.getRelationshipStore(),
(int) stores.getLabelTokenStore().getHighId(),
(int) stores.getRelationshipTypeTokenStore().getHighId(),
NumberArrayFactory.auto( pageCache, stores.getStoreDir(), true ) );
NumberArrayFactory.auto( pageCache, stores.getStoreDir(), true, NumberArrayFactory.NO_MONITOR ) );
}

public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, RelationshipStore relationships,
Expand Down
Expand Up @@ -178,7 +178,7 @@ private void rebuildCountsFromScratch( File storeDirToReadFrom, File migrationDi
int highLabelId = (int) neoStores.getLabelTokenStore().getHighId();
int highRelationshipTypeId = (int) neoStores.getRelationshipTypeTokenStore().getHighId();
CountsComputer initializer = new CountsComputer( lastTxId, nodeStore, relationshipStore, highLabelId,
highRelationshipTypeId, NumberArrayFactory.auto( pageCache, migrationDir, true ),
highRelationshipTypeId, NumberArrayFactory.auto( pageCache, migrationDir, true, NumberArrayFactory.NO_MONITOR ),
progressMonitor );
life.add( new CountsTracker( logProvider, fileSystem, pageCache, config,
storeFileBase, EmptyVersionContextSupplier.EMPTY ).setInitializer( initializer ) );
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeType;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.cache.PageCacheArrayFactoryMonitor;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.CachedInput;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
Expand Down Expand Up @@ -158,6 +159,7 @@ public void insufficientAvailableMemory( long estimatedCacheSize, long optimalMi
private NodeLabelsCache nodeLabelsCache;
private long startTime;
private InputCache inputCache;
private PageCacheArrayFactoryMonitor numberArrayFactoryMonitor;
private NumberArrayFactory numberArrayFactory;
private Collector badCollector;
private IdMapper idMapper;
Expand Down Expand Up @@ -195,7 +197,8 @@ public void initialize( Input input ) throws IOException
startTime = currentTimeMillis();
inputCache = new InputCache( fileSystem, storeDir, recordFormats, toIntExact( mebiBytes( 1 ) ) );
this.input = CachedInput.cacheAsNecessary( input, inputCache );
numberArrayFactory = auto( neoStore.getPageCache(), storeDir, config.allowCacheAllocationOnHeap() );
numberArrayFactoryMonitor = new PageCacheArrayFactoryMonitor();
numberArrayFactory = auto( neoStore.getPageCache(), storeDir, config.allowCacheAllocationOnHeap(), numberArrayFactoryMonitor );
badCollector = input.badCollector();
// Some temporary caches and indexes in the import
idMapper = input.idMapper( numberArrayFactory );
Expand All @@ -208,7 +211,7 @@ public void initialize( Input input ) throws IOException
nodeRelationshipCache.memoryEstimation( inputEstimates.numberOfNodes() ),
idMapper.memoryEstimation( inputEstimates.numberOfNodes() ) );

dependencies.satisfyDependencies( inputEstimates, idMapper, neoStore, nodeRelationshipCache );
dependencies.satisfyDependencies( inputEstimates, idMapper, neoStore, nodeRelationshipCache, numberArrayFactoryMonitor );

if ( neoStore.determineDoubleRelationshipRecordUnits( inputEstimates ) )
{
Expand Down
Expand Up @@ -34,14 +34,14 @@ public class ChunkedNumberArrayFactory extends NumberArrayFactory.Adapter
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - Short.MAX_VALUE;
private final NumberArrayFactory delegate;

ChunkedNumberArrayFactory()
ChunkedNumberArrayFactory( Monitor monitor )
{
this( OFF_HEAP, HEAP );
this( monitor, OFF_HEAP, HEAP );
}

ChunkedNumberArrayFactory( NumberArrayFactory... delegateList )
ChunkedNumberArrayFactory( Monitor monitor, NumberArrayFactory... delegateList )
{
delegate = new Auto( delegateList );
delegate = new Auto( monitor, delegateList );
}

@Override
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;

import org.neo4j.helpers.Exceptions;
Expand All @@ -42,6 +43,23 @@
*/
public interface NumberArrayFactory
{
interface Monitor
{
/**
* Notifies about a successful allocation where information about both successful and failed attempts are included.
*
* @param memory amount of memory for this allocation.
* @param successfulFactory the {@link NumberArrayFactory} which proved successful allocating this amount of memory.
* @param attemptedAllocationFailures list of failed attempts to allocate this memory in other allocators.
*/
void allocationSuccessful( long memory, NumberArrayFactory successfulFactory, Iterable<AllocationFailure> attemptedAllocationFailures );
}

Monitor NO_MONITOR = ( memory, successfulFactory, attemptedAllocationFailures ) ->
{
// no-op
};

/**
* Puts arrays inside the heap.
*/
Expand Down Expand Up @@ -107,12 +125,12 @@ public String toString()
* ({@link #newLongArray(long, long)} and {@link #newIntArray(long, int)} into smaller chunks where
* some can live on heap and some off heap.
*/
NumberArrayFactory CHUNKED_FIXED_SIZE = new ChunkedNumberArrayFactory();
NumberArrayFactory CHUNKED_FIXED_SIZE = new ChunkedNumberArrayFactory( NumberArrayFactory.NO_MONITOR );

/**
* {@link Auto} factory which uses JVM stats for gathering information about available memory.
*/
NumberArrayFactory AUTO_WITHOUT_PAGECACHE = new Auto( OFF_HEAP, HEAP, CHUNKED_FIXED_SIZE );
NumberArrayFactory AUTO_WITHOUT_PAGECACHE = new Auto( NumberArrayFactory.NO_MONITOR, OFF_HEAP, HEAP, CHUNKED_FIXED_SIZE );

/**
* {@link Auto} factory which has a page cache backed number array as final fallback, in order to prevent OOM
Expand All @@ -121,15 +139,16 @@ public String toString()
* @param dir directory where cached files are placed.
* @param allowHeapAllocation whether or not to allow allocation on heap. Otherwise allocation is restricted
* to off-heap and the page cache fallback. This to be more in control of available space in the heap at all times.
* @param monitor for monitoring successful and failed allocations and which factory was selected.
* @return a {@link NumberArrayFactory} which tries to allocation off-heap, then potentially on heap
* and lastly falls back to allocating inside the given {@code pageCache}.
*/
static NumberArrayFactory auto( PageCache pageCache, File dir, boolean allowHeapAllocation )
static NumberArrayFactory auto( PageCache pageCache, File dir, boolean allowHeapAllocation, Monitor monitor )
{
PageCachedNumberArrayFactory pagedArrayFactory = new PageCachedNumberArrayFactory( pageCache, dir );
ChunkedNumberArrayFactory chunkedArrayFactory = new ChunkedNumberArrayFactory(
ChunkedNumberArrayFactory chunkedArrayFactory = new ChunkedNumberArrayFactory( monitor,
allocationAlternatives( allowHeapAllocation, pagedArrayFactory ) );
return new Auto( allocationAlternatives( allowHeapAllocation, chunkedArrayFactory ) );
return new Auto( monitor, allocationAlternatives( allowHeapAllocation, chunkedArrayFactory ) );
}

/**
Expand All @@ -148,17 +167,41 @@ static NumberArrayFactory[] allocationAlternatives( boolean allowHeapAllocation,
return result.toArray( new NumberArrayFactory[result.size()] );
}

class AllocationFailure
{
private final Throwable failure;
private final NumberArrayFactory factory;

AllocationFailure( Throwable failure, NumberArrayFactory factory )
{
this.failure = failure;
this.factory = factory;
}

public Throwable getFailure()
{
return failure;
}

public NumberArrayFactory getFactory()
{
return factory;
}
}

/**
* Looks at available memory and decides where the requested array fits best. Tries to allocate the whole
* array with the first candidate, falling back to others as needed.
*/
class Auto extends Adapter
{

private final Monitor monitor;
private final NumberArrayFactory[] candidates;

public Auto( NumberArrayFactory... candidates )
public Auto( Monitor monitor, NumberArrayFactory... candidates )
{
Objects.requireNonNull( monitor );
this.monitor = monitor;
this.candidates = candidates;
}

Expand All @@ -183,14 +226,17 @@ public ByteArray newByteArray( long length, byte[] defaultValue, long base )
private <T extends NumberArray<? extends T>> T tryAllocate( long length, int itemSize,
Function<NumberArrayFactory,T> allocator )
{
List<AllocationFailure> failures = new ArrayList<>();
OutOfMemoryError error = null;
for ( NumberArrayFactory candidate : candidates )
{
try
{
try
{
return allocator.apply( candidate );
T array = allocator.apply( candidate );
monitor.allocationSuccessful( length * itemSize, candidate, failures );
return array;
}
catch ( ArithmeticException e )
{
Expand All @@ -208,6 +254,7 @@ private <T extends NumberArray<? extends T>> T tryAllocate( long length, int ite
e.addSuppressed( error );
error = e;
}
failures.add( new AllocationFailure( e, candidate ) );
}
}
throw error( length, itemSize, error );
Expand Down Expand Up @@ -302,7 +349,6 @@ default ByteArray newByteArray( long length, byte[] defaultValue )

abstract class Adapter implements NumberArrayFactory
{

@Override
public IntArray newDynamicIntArray( long chunkSize, int defaultValue )
{
Expand All @@ -320,6 +366,5 @@ public ByteArray newDynamicByteArray( long chunkSize, byte[] defaultValue )
{
return new DynamicByteArray( this, chunkSize, defaultValue );
}

}
}
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.cache;

import java.util.concurrent.atomic.AtomicReference;

import org.neo4j.io.pagecache.PageCache;

import static java.lang.String.format;
import static org.neo4j.helpers.Format.bytes;

public class PageCacheArrayFactoryMonitor implements NumberArrayFactory.Monitor
{
// This field is designed to allow multiple threads setting it concurrently, where one of those will win and either one is fine
// because this monitor mostly revolves around highlighting the fact that the page cache number array is in use at all.
private final AtomicReference<String> failedFactoriesDescription = new AtomicReference<>();

@Override
public void allocationSuccessful( long memory, NumberArrayFactory successfulFactory,
Iterable<NumberArrayFactory.AllocationFailure> attemptedAllocationFailures )
{
if ( successfulFactory instanceof PageCachedNumberArrayFactory )
{
StringBuilder builder =
new StringBuilder( format( "Memory allocation of %s ended up in page cache, which may impact performance negatively", bytes( memory ) ) );
attemptedAllocationFailures.forEach(
failure -> builder.append( format( "%n%s: %s", failure.getFactory().toString(), failure.getFailure().toString() ) ) );
failedFactoriesDescription.compareAndSet( null, builder.toString() );
}
}

/**
* Called by user-facing progress monitor at arbitrary points to get information about whether or not there has been
* one or more {@link NumberArrayFactory} allocations backed by the {@link PageCache}, this because it severely affects
* performance. Calling this method clears the failure description, if any.
*
* @return if there have been {@link NumberArrayFactory} allocations backed by the {@link PageCache} since the last call to this method
* then a description of why it was chosen is returned, otherwise {@code null}.
*/
public String pageCacheAllocationOrNull()
{
String failure = failedFactoriesDescription.get();
try
{
return failure;
}
finally
{
if ( failure != null )
{
failedFactoriesDescription.compareAndSet( failure, null );
}
}
}
}
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.unsafe.impl.batchimport.ScanAndCacheGroupsStage;
import org.neo4j.unsafe.impl.batchimport.SparseNodeFirstRelationshipStage;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.PageCacheArrayFactoryMonitor;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.Input;
import org.neo4j.unsafe.impl.batchimport.input.Input.Estimates;
Expand Down Expand Up @@ -70,7 +71,7 @@ public interface ExternalMonitor
boolean somethingElseBrokeMyNiceOutput();
}

public static final ExternalMonitor NO_EXTERNAL_MONITOR = () -> false;
static final ExternalMonitor NO_EXTERNAL_MONITOR = () -> false;

enum ImportStage
{
Expand All @@ -95,6 +96,7 @@ enum ImportStage
private final ExternalMonitor externalMonitor;
private DependencyResolver dependencyResolver;
private boolean newInternalStage;
private PageCacheArrayFactoryMonitor pageCacheArrayFactoryMonitor;

// progress of current stage
private long goal;
Expand All @@ -103,7 +105,7 @@ enum ImportStage
private ImportStage currentStage;
private long lastReportTime;

public HumanUnderstandableExecutionMonitor( PrintStream out, Monitor monitor, ExternalMonitor externalMonitor )
HumanUnderstandableExecutionMonitor( PrintStream out, Monitor monitor, ExternalMonitor externalMonitor )
{
this.out = out;
this.monitor = monitor;
Expand All @@ -118,6 +120,7 @@ public void initialize( DependencyResolver dependencyResolver )
BatchingNeoStores neoStores = dependencyResolver.resolveDependency( BatchingNeoStores.class );
IdMapper idMapper = dependencyResolver.resolveDependency( IdMapper.class );
NodeRelationshipCache nodeRelationshipCache = dependencyResolver.resolveDependency( NodeRelationshipCache.class );
pageCacheArrayFactoryMonitor = dependencyResolver.resolveDependency( PageCacheArrayFactoryMonitor.class );

long biggestCacheMemory = estimatedCacheSize( neoStores,
nodeRelationshipCache.memoryEstimation( estimates.numberOfNodes() ),
Expand All @@ -130,7 +133,6 @@ ESTIMATED_NUMBER_OF_RELATIONSHIP_PROPERTIES, count( estimates.numberOfRelationsh
ESTIMATED_DISK_SPACE_USAGE, bytes(
nodesDiskUsage( estimates, neoStores ) +
relationshipsDiskUsage( estimates, neoStores ) +
// TODO also add some padding to include relationship groups?
estimates.sizeOfNodeProperties() + estimates.sizeOfRelationshipProperties() ),
ESTIMATED_REQUIRED_MEMORY_USAGE, bytes( biggestCacheMemory ) );
out.println();
Expand Down Expand Up @@ -216,13 +218,11 @@ else if ( includeStage( execution ) )

private void endPrevious()
{
updateProgress( goal ); // previous ended
// TODO print some end stats for this stage?
updateProgress( goal );
}

private void initializeNodeImport( Estimates estimates, IdMapper idMapper, BatchingNeoStores neoStores )
{
// TODO how to handle UNKNOWN?
long numberOfNodes = estimates.numberOfNodes();
printStageHeader( "(1/4) Node import",
ESTIMATED_NUMBER_OF_NODES, count( numberOfNodes ),
Expand Down Expand Up @@ -340,7 +340,6 @@ private void updateProgress( long progress )
}
}

// TODO not quite right
this.progress = max( this.progress, progress );
}

Expand Down Expand Up @@ -378,6 +377,19 @@ private void printDots( int from, int target )
}
out.print( dotChar );
current++;

printPageCacheAllocationWarningIfUsed();
}
}

private void printPageCacheAllocationWarningIfUsed()
{
String allocation = pageCacheArrayFactoryMonitor.pageCacheAllocationOrNull();
if ( allocation != null )
{
System.err.println();
System.err.println( "WARNING:" );
System.err.println( allocation );
}
}

Expand Down

0 comments on commit 8a393f2

Please sign in to comment.