Skip to content

Commit

Permalink
Utilizes available memory and imports multiple types at a time
Browse files Browse the repository at this point in the history
Instead of only a single type at a time. This re-introduces caching for
keeping relationship heads for multiple types per dense node in
NodeRelationshipCache. This will get the best of both the previous strategy
of always importing all relationships in one round AND the current strategy
of importing one type per round (to reduce memory consumption).
Now the amount of available memory will decide how many rounds of relationship
imports are required.

This also removes the per-type-splitting on-disk caching of the input data
which was done when importing relationships of the first (and biggest) relationship type,
something which in most case will improve performance of the import in general
and avoid sections of relationship import where seemingly there were no progress,
due to only caching relationships.

There's now an additional setting for how much memory the importer can use
as a whole and is by default based on amount of free physical memory on the machine.

The defragmentation of relationship groups after relationship import also makes
use of available memory and can reduce number of rounds needed.
  • Loading branch information
tinwelint committed Apr 18, 2017
1 parent 6ac8938 commit 0fa339f
Show file tree
Hide file tree
Showing 31 changed files with 825 additions and 932 deletions.
Expand Up @@ -101,6 +101,7 @@ public class ParallelBatchImporterTest
private static final int NODE_COUNT = 10_000;
private static final int RELATIONSHIPS_PER_NODE = 5;
private static final int RELATIONSHIP_COUNT = NODE_COUNT * RELATIONSHIPS_PER_NODE;
private static final int RELATIONSHIP_TYPES = 3;
protected final Configuration config = new Configuration()
{
@Override
Expand All @@ -124,6 +125,17 @@ public int maxNumberOfProcessors()
int cores = Runtime.getRuntime().availableProcessors();
return random.intBetween( cores, cores + 100 );
}

@Override
public long maxMemoryUsage()
{
// This calculation is just to try and hit some sort of memory limit so that relationship import
// is split up into multiple rounds. Also to see that relationship group defragmentation works
// well when doing multiple rounds.
double ratio = (NODE_COUNT / 1_000D );
long mebi = 1024 * 1024;
return random.nextInt( (int) (ratio * mebi / 2), (int) (ratio * mebi) );
}
};
private final InputIdGenerator inputIdGenerator;
private final IdMapper idMapper;
Expand Down Expand Up @@ -253,7 +265,7 @@ public abstract static class InputIdGenerator

String randomType( Random random )
{
return "TYPE" + random.nextInt( 3 );
return "TYPE" + random.nextInt( RELATIONSHIP_TYPES );
}

@Override
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.StoreLogService;
import org.neo4j.kernel.impl.storemigration.ExistingTargetStrategy;
Expand Down Expand Up @@ -82,6 +83,8 @@
import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit;
import static org.neo4j.kernel.impl.util.Converters.withDefault;
import static org.neo4j.unsafe.impl.batchimport.Configuration.BAD_FILE_NAME;
import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT_MAX_MEMORY_PERCENT;
import static org.neo4j.unsafe.impl.batchimport.Configuration.calculateMaxMemoryFromPercent;
import static org.neo4j.unsafe.impl.batchimport.input.Collectors.badCollector;
import static org.neo4j.unsafe.impl.batchimport.input.Collectors.collect;
import static org.neo4j.unsafe.impl.batchimport.input.Collectors.silentBadCollector;
Expand All @@ -99,7 +102,6 @@
public class ImportTool
{
private static final String UNLIMITED = "true";
private static final int UNSPECIFIED = -1;

enum Options
{
Expand Down Expand Up @@ -232,7 +234,13 @@ enum Options
READ_BUFFER_SIZE( "read-buffer-size", org.neo4j.csv.reader.Configuration.DEFAULT.bufferSize(),
"<bytes, e.g. 10k, 4M>",
"Size of each buffer for reading input data. It has to at least be large enough to hold the " +
"biggest single value in the input data." );
"biggest single value in the input data." ),
MAX_MEMORY( "max-memory", null,
"<max memory that importer can use>",
"(advanced) Maximum memory that importer can use for various data structures and caching " +
"to improve performance. By default set to " + DEFAULT_MAX_MEMORY_PERCENT +
"% of (free memory on machine - max JVM memory). " +
"Values can be plain numbers, like 10000000 or e.g. 20G for 20 gigabyte, or even e.g. 70%." );

private final String key;
private final Object defaultValue;
Expand Down Expand Up @@ -378,10 +386,10 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
Config dbConfig;
OutputStream badOutput = null;
IdType idType = null;
int pageSize = UNSPECIFIED;
org.neo4j.unsafe.impl.batchimport.Configuration configuration = null;
File logsDir;
File badFile = null;
Long maxMemory = null;

boolean success = false;
try
Expand All @@ -402,6 +410,8 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
}
nodesFiles = extractInputFiles( args, Options.NODE_DATA.key(), err );
relationshipsFiles = extractInputFiles( args, Options.RELATIONSHIP_DATA.key(), err );
String maxMemoryString = args.get( Options.MAX_MEMORY.key(), null );
maxMemory = parseMaxMemory( maxMemoryString );

validateInputFiles( nodesFiles, relationshipsFiles );
enableStacktrace = args.getBoolean( Options.STACKTRACE.key(), Boolean.FALSE, Boolean.TRUE );
Expand All @@ -423,7 +433,7 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit

dbConfig = loadDbConfig( args.interpretOption( Options.DATABASE_CONFIG.key(), Converters.<File>optional(),
Converters.toFile(), Validators.REGEX_FILE_EXISTS ) );
configuration = importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, pageSize );
configuration = importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, maxMemory );
input = new CsvInput( nodeData( inputEncoding, nodesFiles ), defaultFormatNodeFileHeader(),
relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(),
idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector,
Expand Down Expand Up @@ -451,6 +461,20 @@ idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector,
}
}

private static Long parseMaxMemory( String maxMemoryString )
{
if ( maxMemoryString != null )
{
if ( maxMemoryString.endsWith( "%" ) )
{
int percent = Integer.parseInt( maxMemoryString.substring( 0, maxMemoryString.length() - 1 ) );
return calculateMaxMemoryFromPercent( percent );
}
return Settings.parseLongWithUnit( maxMemoryString );
}
return null;
}

public static void doImport( PrintStream out, PrintStream err, File storeDir, File logsDir, File badFile,
FileSystemAbstraction fs, Collection<Option<File[]>> nodesFiles,
Collection<Option<File[]>> relationshipsFiles, boolean enableStacktrace, Input input,
Expand Down Expand Up @@ -570,9 +594,11 @@ private static void printOverview( File storeDir, Collection<Option<File[]>> nod
printInputFiles( "Relationships", relationshipsFiles, out );
out.println();
out.println( "Available resources:" );
printIndented( "Total machine memory: " + bytes( OsBeanUtil.getTotalPhysicalMemory() ), out );
printIndented( "Free machine memory: " + bytes( OsBeanUtil.getFreePhysicalMemory() ), out );
printIndented( "Max heap memory : " + bytes( Runtime.getRuntime().maxMemory() ), out );
printIndented( "Processors: " + configuration.maxNumberOfProcessors(), out );
printIndented( "Configured max memory: " + bytes( configuration.maxMemoryUsage() ), out );
out.println();
}

Expand Down Expand Up @@ -623,11 +649,11 @@ public static void validateInputFiles( Collection<Option<File[]>> nodesFiles,
public static org.neo4j.unsafe.impl.batchimport.Configuration importConfiguration( final Number processors,
final boolean defaultSettingsSuitableForTests, final Config dbConfig )
{
return importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, UNSPECIFIED );
return importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, null );
}

public static org.neo4j.unsafe.impl.batchimport.Configuration importConfiguration( final Number processors,
final boolean defaultSettingsSuitableForTests, final Config dbConfig, int pageSize )
final boolean defaultSettingsSuitableForTests, final Config dbConfig, Long maxMemory )
{
return new org.neo4j.unsafe.impl.batchimport.Configuration()
{
Expand All @@ -648,6 +674,12 @@ public int denseNodeThreshold()
{
return dbConfig.get( GraphDatabaseSettings.dense_node_threshold );
}

@Override
public long maxMemoryUsage()
{
return maxMemory != null ? maxMemory.longValue() : DEFAULT.maxMemoryUsage();
}
};
}

Expand Down
Expand Up @@ -1741,6 +1741,52 @@ public void shouldRespectBufferSizeSetting() throws Exception
}
}

@Test
public void shouldRespectMaxMemoryPercentageSetting() throws Exception
{
// GIVEN
List<String> nodeIds = nodeIds( 10 );

// WHEN
importTool(
"--into", dbRule.getStoreDirAbsolutePath(),
"--nodes", nodeData( true, Configuration.COMMAS, nodeIds, TRUE ).getAbsolutePath(),
"--max-memory", "60%" );
}

@Test
public void shouldFailOnInvalidMaxMemoryPercentageSetting() throws Exception
{
// GIVEN
List<String> nodeIds = nodeIds( 10 );

try
{
// WHEN
importTool( "--into", dbRule.getStoreDirAbsolutePath(), "--nodes",
nodeData( true, Configuration.COMMAS, nodeIds, TRUE ).getAbsolutePath(), "--max-memory", "110%" );
fail( "Should have failed" );
}
catch ( IllegalArgumentException e )
{
// THEN good
assertThat( e.getMessage(), containsString( "percent" ) );
}
}

@Test
public void shouldRespectMaxMemorySuffixedSetting() throws Exception
{
// GIVEN
List<String> nodeIds = nodeIds( 10 );

// WHEN
importTool(
"--into", dbRule.getStoreDirAbsolutePath(),
"--nodes", nodeData( true, Configuration.COMMAS, nodeIds, TRUE ).getAbsolutePath(),
"--max-memory", "100M" );
}

private File writeArrayCsv( String[] headers, String[] values ) throws FileNotFoundException
{
File data = file( fileName( "whitespace.csv" ) );
Expand Down
Expand Up @@ -233,6 +233,16 @@ public static int safeCastLongToInt( long value )
return (int) value;
}

public static short safeCastIntToUnsignedShort( int value )
{
if ( (value & ~0xFFFF) != 0 )
{
throw new IllegalArgumentException(
"Casting int value " + value + " to an unsigned short would wrap around" );
}
return (short) value;
}

public static int shortToUnsignedInt( short value )
{
return value & 0xFFFF;
Expand Down

This file was deleted.

0 comments on commit 0fa339f

Please sign in to comment.