Skip to content

Commit

Permalink
Reworked sanity checking of memory requirements in importer
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Jan 24, 2018
1 parent bbdcdfa commit ff082ee
Show file tree
Hide file tree
Showing 14 changed files with 536 additions and 40 deletions.
Expand Up @@ -83,7 +83,6 @@
import static org.neo4j.helpers.Format.bytes; import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.helpers.Strings.TAB; import static org.neo4j.helpers.Strings.TAB;
import static org.neo4j.helpers.TextUtil.tokenizeStringWithQuotes; import static org.neo4j.helpers.TextUtil.tokenizeStringWithQuotes;
import static org.neo4j.io.ByteUnit.gibiBytes;
import static org.neo4j.io.ByteUnit.mebiBytes; import static org.neo4j.io.ByteUnit.mebiBytes;
import static org.neo4j.io.fs.FileUtils.readTextFile; import static org.neo4j.io.fs.FileUtils.readTextFile;
import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit; import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit;
Expand Down Expand Up @@ -111,12 +110,6 @@
*/ */
public class ImportTool public class ImportTool
{ {
/**
* A high heap size that no import should need to go beyond. Also it's small enough such that object pointers
* can be compressed to 4 bytes.
*/
private static final long RECOMMENDED_MAX_HEAP_SIZE = gibiBytes( 28 );

private static final String INPUT_FILES_DESCRIPTION = private static final String INPUT_FILES_DESCRIPTION =
"Multiple files will be logically seen as one big file " + "Multiple files will be logically seen as one big file " +
"from the perspective of the importer. " + "from the perspective of the importer. " +
Expand Down Expand Up @@ -392,11 +385,6 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
{ {
System.err.println( format( "WARNING: neo4j-import is deprecated and support for it will be removed in a future%n" + System.err.println( format( "WARNING: neo4j-import is deprecated and support for it will be removed in a future%n" +
"version of Neo4j; please use neo4j-admin import instead." ) ); "version of Neo4j; please use neo4j-admin import instead." ) );
if ( Runtime.getRuntime().maxMemory() > RECOMMENDED_MAX_HEAP_SIZE )
{
System.err.println( format( "WARNING: heap size is unnecessarily big, potentially causing garbage collection issues and%n" +
"less room for importer off-heap caches. A heap of maximum %s is recommended", bytes( RECOMMENDED_MAX_HEAP_SIZE ) ) );
}


PrintStream out = System.out; PrintStream out = System.out;
PrintStream err = System.err; PrintStream err = System.err;
Expand Down
Expand Up @@ -23,6 +23,8 @@


import org.neo4j.unsafe.impl.batchimport.ImportLogic; import org.neo4j.unsafe.impl.batchimport.ImportLogic;


import static org.neo4j.helpers.Format.bytes;

class PrintingImportLogicMonitor implements ImportLogic.Monitor class PrintingImportLogicMonitor implements ImportLogic.Monitor
{ {
private final PrintStream out; private final PrintStream out;
Expand Down Expand Up @@ -53,4 +55,28 @@ public void mayExceedRelationshipIdCapacity( long capacity, long estimatedCount
err.printf( "WARNING: estimated number of nodes %d may exceed capacity %d of selected record format%n", err.printf( "WARNING: estimated number of nodes %d may exceed capacity %d of selected record format%n",
estimatedCount, capacity ); estimatedCount, capacity );
} }

@Override
public void insufficientHeapSize( long optimalMinimalHeapSize, long heapSize )
{
err.printf( "WARNING: heap size %s may be too small to complete this import. Suggested heap size is %s",
bytes( heapSize ), bytes( optimalMinimalHeapSize ) );
}

@Override
public void abundantHeapSize( long optimalMinimalHeapSize, long heapSize )
{
err.printf( "WARNING: heap size %s is unnecessarily large for completing this import.%n" +
"The abundant heap memory will leave less memory for off-heap importer caches. Suggested heap size is %s",
bytes( heapSize ), bytes( optimalMinimalHeapSize ) );
}

@Override
public void insufficientAvailableMemory( long estimatedCacheSize, long optimalMinimalHeapSize, long availableMemory )
{
err.printf( "WARNING: %s memory may not be sufficient to complete this import. Suggested memory distribution is:%n" +
"heap size: %s%n" +
"minimum free and available memory excluding heap size: %s",
bytes( availableMemory ), bytes( optimalMinimalHeapSize ), bytes( estimatedCacheSize ) );
}
} }
@@ -0,0 +1,126 @@
package org.neo4j.tooling;

import org.junit.After;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;

import org.neo4j.unsafe.impl.batchimport.ImportLogic;

import static org.junit.Assert.assertTrue;

import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.io.ByteUnit.gibiBytes;

/**
* Why test a silly thing like this? This implementation contains some printf calls that needs to get arguments correct
* or will otherwise throw exception. It's surprisingly easy to get those wrong.
*/
public class PrintingImportLogicMonitorTest
{
private final ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
private final PrintStream out = new PrintStream( outBuffer );
private final ByteArrayOutputStream errBuffer = new ByteArrayOutputStream();
private final PrintStream err = new PrintStream( errBuffer );
private final ImportLogic.Monitor monitor = new PrintingImportLogicMonitor( out, err );

@After
public void after()
{
System.out.println( "out " + outBuffer );
System.out.println( "err " + errBuffer );
}

@Test
public void mayExceedNodeIdCapacity() throws Exception
{
// given
long capacity = 10_000_000;
long estimatedCount = 12_000_000;

// when
monitor.mayExceedNodeIdCapacity( capacity, estimatedCount );

// then
String text = errBuffer.toString();
assertTrue( text.contains( "WARNING" ) );
assertTrue( text.contains( "exceed" ) );
assertTrue( text.contains( String.valueOf( capacity ) ) );
assertTrue( text.contains( String.valueOf( estimatedCount ) ) );
}

@Test
public void mayExceedRelationshipIdCapacity() throws Exception
{
// given
long capacity = 10_000_000;
long estimatedCount = 12_000_000;

// when
monitor.mayExceedRelationshipIdCapacity( capacity, estimatedCount );

// then
String text = errBuffer.toString();
assertTrue( text.contains( "WARNING" ) );
assertTrue( text.contains( "exceed" ) );
assertTrue( text.contains( String.valueOf( capacity ) ) );
assertTrue( text.contains( String.valueOf( estimatedCount ) ) );
}

@Test
public void insufficientHeapSize() throws Exception
{
// given
long optimalHeapSize = gibiBytes( 2 );
long heapSize = gibiBytes( 1 );

// when
monitor.insufficientHeapSize( optimalHeapSize, heapSize );

// then
String text = errBuffer.toString();
assertTrue( text.contains( "WARNING" ) );
assertTrue( text.contains( "too small" ) );
assertTrue( text.contains( bytes( heapSize ) ) );
assertTrue( text.contains( bytes( optimalHeapSize ) ) );
}

@Test
public void abundantHeapSize() throws Exception
{
// given
long optimalHeapSize = gibiBytes( 2 );
long heapSize = gibiBytes( 10 );

// when
monitor.abundantHeapSize( optimalHeapSize, heapSize );

// then
String text = errBuffer.toString();
assertTrue( text.contains( "WARNING" ) );
assertTrue( text.contains( "unnecessarily large" ) );
assertTrue( text.contains( bytes( heapSize ) ) );
assertTrue( text.contains( bytes( optimalHeapSize ) ) );
}

@Test
public void insufficientAvailableMemory() throws Exception
{
// given
long estimatedCacheSize = gibiBytes( 2 );
long optimalHeapSize = gibiBytes( 2 );
long availableMemory = gibiBytes( 1 );

// when
monitor.insufficientAvailableMemory( estimatedCacheSize, optimalHeapSize, availableMemory );

// then
String text = errBuffer.toString();
assertTrue( text.contains( "WARNING" ) );
assertTrue( text.contains( "may not be sufficient" ) );
assertTrue( text.contains( bytes( estimatedCacheSize ) ) );
assertTrue( text.contains( bytes( optimalHeapSize ) ) );
assertTrue( text.contains( bytes( availableMemory ) ) );
}
}
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import java.util.function.LongSupplier;

import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.util.OsBeanUtil;
import org.neo4j.unsafe.impl.batchimport.cache.MemoryStatsVisitor;
import org.neo4j.unsafe.impl.batchimport.input.Input;

import static org.neo4j.kernel.impl.util.OsBeanUtil.VALUE_UNAVAILABLE;
import static org.neo4j.unsafe.impl.batchimport.ImportMemoryCalculator.estimatedCacheSize;
import static org.neo4j.unsafe.impl.batchimport.ImportMemoryCalculator.optimalMinimalHeapSize;

/**
* Sanity checking of {@link org.neo4j.unsafe.impl.batchimport.input.Input.Estimates} against heap size and free memory.
* Registers warnings onto a {@link ImportLogic.Monitor}.
*/
public class HeapSizeSanityChecker
{
private final ImportLogic.Monitor monitor;
private final LongSupplier freeMemoryLookup;
private final LongSupplier actualHeapSizeLookup;

HeapSizeSanityChecker( ImportLogic.Monitor monitor )
{
this( monitor, OsBeanUtil::getFreePhysicalMemory, Runtime.getRuntime()::maxMemory );
}

public HeapSizeSanityChecker( ImportLogic.Monitor monitor, LongSupplier freeMemoryLookup, LongSupplier actualHeapSizeLookup )
{
this.monitor = monitor;
this.freeMemoryLookup = freeMemoryLookup;
this.actualHeapSizeLookup = actualHeapSizeLookup;
}

public void sanityCheck( Input.Estimates inputEstimates, RecordFormats recordFormats, MemoryStatsVisitor.Visitable baseMemory,
MemoryStatsVisitor.Visitable... memoryVisitables )
{
// At this point in time the store hasn't started so it won't show up in free memory reported from OS,
// i.e. we have to include it here in the calculations.
long estimatedCacheSize = estimatedCacheSize( baseMemory, memoryVisitables );
long freeMemory = freeMemoryLookup.getAsLong();
long optimalMinimalHeapSize = optimalMinimalHeapSize( inputEstimates, recordFormats );
long actualHeapSize = actualHeapSizeLookup.getAsLong();
boolean freeMemoryIsKnown = freeMemory != VALUE_UNAVAILABLE;

// Check if there's enough memory for the import
if ( freeMemoryIsKnown && actualHeapSize + freeMemory < estimatedCacheSize + optimalMinimalHeapSize )
{
monitor.insufficientAvailableMemory( estimatedCacheSize, optimalMinimalHeapSize, freeMemory );
return; // there's likely not available memory, no need to warn about anything else
}

// Check if the heap is big enough to handle the import
if ( actualHeapSize < optimalMinimalHeapSize )
{
monitor.insufficientHeapSize( optimalMinimalHeapSize, actualHeapSize );
return; // user have been warned about heap size issue
}

// Check if heap size could be tweaked
if ( ((freeMemoryIsKnown && freeMemory < estimatedCacheSize) || !freeMemoryIsKnown) && actualHeapSize > optimalMinimalHeapSize * 1.2 )
{
monitor.abundantHeapSize( optimalMinimalHeapSize, actualHeapSize );
}
}
}
Expand Up @@ -92,6 +92,12 @@ public interface Monitor
void mayExceedNodeIdCapacity( long capacity, long estimatedCount ); void mayExceedNodeIdCapacity( long capacity, long estimatedCount );


void mayExceedRelationshipIdCapacity( long capacity, long estimatedCount ); void mayExceedRelationshipIdCapacity( long capacity, long estimatedCount );

void insufficientHeapSize( long optimalMinimalHeapSize, long heapSize );

void abundantHeapSize( long optimalMinimalHeapSize, long heapSize );

void insufficientAvailableMemory( long estimatedCacheSize, long optimalMinimalHeapSize, long availableMemory );
} }


public static final Monitor NO_MONITOR = new Monitor() public static final Monitor NO_MONITOR = new Monitor()
Expand All @@ -110,6 +116,21 @@ public void mayExceedNodeIdCapacity( long capacity, long estimatedCount )
public void doubleRelationshipRecordUnitsEnabled() public void doubleRelationshipRecordUnitsEnabled()
{ // no-op { // no-op
} }

@Override
public void insufficientHeapSize( long optimalMinimalHeapSize, long heapSize )
{ // no-op
}

@Override
public void abundantHeapSize( long optimalMinimalHeapSize, long heapSize )
{ // no-op
}

@Override
public void insufficientAvailableMemory( long estimatedCacheSize, long optimalMinimalHeapSize, long availableMemory )
{ // no-op
}
}; };


private final File storeDir; private final File storeDir;
Expand Down Expand Up @@ -179,7 +200,13 @@ public void initialize( Input input ) throws IOException
idMapper = input.idMapper( numberArrayFactory ); idMapper = input.idMapper( numberArrayFactory );
nodeRelationshipCache = new NodeRelationshipCache( numberArrayFactory, config.denseNodeThreshold() ); nodeRelationshipCache = new NodeRelationshipCache( numberArrayFactory, config.denseNodeThreshold() );
Estimates inputEstimates = input.calculateEstimates( neoStore.getPropertyStore().newValueEncodedSizeCalculator() ); Estimates inputEstimates = input.calculateEstimates( neoStore.getPropertyStore().newValueEncodedSizeCalculator() );
sanityCheckEstimatesWithRecordFormat( inputEstimates );
// Sanity checking against estimates
new EstimationSanityChecker( recordFormats, monitor ).sanityCheck( inputEstimates );
new HeapSizeSanityChecker( monitor ).sanityCheck( inputEstimates, recordFormats, neoStore,
nodeRelationshipCache.memoryEstimation( inputEstimates.numberOfNodes() ),
idMapper.memoryEstimation( inputEstimates.numberOfNodes() ) );

dependencies.satisfyDependencies( inputEstimates, idMapper, neoStore, nodeRelationshipCache ); dependencies.satisfyDependencies( inputEstimates, idMapper, neoStore, nodeRelationshipCache );


if ( neoStore.determineDoubleRelationshipRecordUnits( inputEstimates ) ) if ( neoStore.determineDoubleRelationshipRecordUnits( inputEstimates ) )
Expand All @@ -190,11 +217,6 @@ public void initialize( Input input ) throws IOException
executionMonitor.initialize( dependencies ); executionMonitor.initialize( dependencies );
} }


private void sanityCheckEstimatesWithRecordFormat( Estimates inputEstimates )
{
new EstimationSanityChecker( recordFormats, monitor ).sanityCheck( inputEstimates );
}

/** /**
* Accesses state of a certain {@code type}. This is state that may be long- or short-lived and perhaps * Accesses state of a certain {@code type}. This is state that may be long- or short-lived and perhaps
* created in one part of the import to be used in another. * created in one part of the import to be used in another.
Expand Down

0 comments on commit ff082ee

Please sign in to comment.