Skip to content

Commit

Permalink
Configurable page size in importer
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Aug 29, 2016
1 parent 1762664 commit 0f457f4
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 74 deletions.
Expand Up @@ -37,13 +37,15 @@
import org.neo4j.helpers.Args.Option;
import org.neo4j.helpers.ArrayUtil;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.Format;
import org.neo4j.helpers.Strings;
import org.neo4j.helpers.collection.IterableWrapper;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.StoreLogService;
import org.neo4j.kernel.impl.storemigration.ExistingTargetStrategy;
Expand Down Expand Up @@ -71,7 +73,9 @@
import org.neo4j.unsafe.impl.batchimport.input.csv.IdType;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors;

import static java.lang.Math.toIntExact;
import static java.nio.charset.Charset.defaultCharset;

import static org.neo4j.helpers.Exceptions.launderedException;
import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.helpers.Strings.TAB;
Expand Down Expand Up @@ -194,7 +198,10 @@ enum Options
+ "Examples of supported config are:\n"
+ GraphDatabaseSettings.dense_node_threshold.name() + "\n"
+ GraphDatabaseSettings.string_block_size.name() + "\n"
+ GraphDatabaseSettings.array_block_size.name() );
+ GraphDatabaseSettings.array_block_size.name() ),
PAGE_SIZE( "page-size", Format.bytes( org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT.pageSize() ),
"<page size in bytes",
"Page size in bytes, or e.g. 4M or 8k" );

private final String key;
private final Object defaultValue;
Expand Down Expand Up @@ -319,6 +326,7 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
boolean skipBadRelationships, skipDuplicateNodes, ignoreExtraColumns;
Config dbConfig;
OutputStream badOutput = null;
int pageSize = -1;
org.neo4j.unsafe.impl.batchimport.Configuration configuration = null;

boolean success = false;
Expand Down Expand Up @@ -351,7 +359,9 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit

dbConfig = loadDbConfig( args.interpretOption( Options.DATABASE_CONFIG.key(), Converters.<File>optional(),
Converters.toFile(), Validators.REGEX_FILE_EXISTS ) );
configuration = importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig );
pageSize = toIntExact( Settings.parseLongWithUnit( args.get( Options.PAGE_SIZE.key(),
String.valueOf( org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT.pageSize() ) ) ) );
configuration = importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, pageSize );
input = new CsvInput( nodeData( inputEncoding, nodesFiles ), defaultFormatNodeFileHeader(),
relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(),
idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector,
Expand Down Expand Up @@ -497,7 +507,7 @@ private static void validateInputFiles( Collection<Option<File[]>> nodesFiles,
}

private static org.neo4j.unsafe.impl.batchimport.Configuration importConfiguration( final Number processors,
final boolean defaultSettingsSuitableForTests, final Config dbConfig )
final boolean defaultSettingsSuitableForTests, final Config dbConfig, int pageSize )
{
return new org.neo4j.unsafe.impl.batchimport.Configuration.Default()
{
Expand All @@ -518,6 +528,12 @@ public int denseNodeThreshold()
{
return dbConfig.get( GraphDatabaseSettings.dense_node_threshold );
}

@Override
public int pageSize()
{
return pageSize;
}
};
}

Expand Down
Expand Up @@ -24,6 +24,7 @@
import static java.lang.Math.min;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.pagecache_memory;
import static org.neo4j.io.ByteUnit.kibiBytes;
import static org.neo4j.io.ByteUnit.mebiBytes;

/**
Expand Down Expand Up @@ -52,6 +53,8 @@ public interface Configuration extends org.neo4j.unsafe.impl.batchimport.staging
*/
long pageCacheMemory();

int pageSize();

class Default
extends org.neo4j.unsafe.impl.batchimport.staging.Configuration.Default
implements Configuration
Expand All @@ -69,6 +72,27 @@ public int denseNodeThreshold()
{
return Integer.parseInt( dense_node_threshold.getDefaultValue() );
}

private static int calculateOptimalPageSize( long memorySize, int numberOfPages )
{
int pageSize = (int) mebiBytes( 8 );
int lowest = (int) kibiBytes( 8 );
while ( pageSize > lowest )
{
if ( memorySize / pageSize >= numberOfPages )
{
return pageSize;
}
pageSize >>>= 1;
}
return lowest;
}

@Override
public int pageSize()
{
return calculateOptimalPageSize( pageCacheMemory(), 60 );
}
}

Configuration DEFAULT = new Default();
Expand Down Expand Up @@ -114,6 +138,12 @@ public int movingAverageSize()
{
return defaults.movingAverageSize();
}

@Override
public int pageSize()
{
return defaults.pageSize();
}
}

public static Configuration withBatchSize( Configuration config, int batchSize )
Expand Down
Expand Up @@ -70,8 +70,6 @@
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.mapped_memory_page_size;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.pagecache_memory;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.io.ByteUnit.kibiBytes;
import static org.neo4j.io.ByteUnit.mebiBytes;
import static org.neo4j.kernel.impl.store.MetaDataStore.DEFAULT_NAME;
import static org.neo4j.kernel.impl.store.StoreType.RELATIONSHIP_GROUP;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
Expand Down Expand Up @@ -111,7 +109,7 @@ public BatchingNeoStores( FileSystemAbstraction fileSystem, File storeDir, Recor
// 30 is the minimum number of pages the page cache wants to keep free at all times.
// Having less than that might result in an evicted page will reading, which would mean
// unnecessary re-reading. Having slightly more leaves some leg room.
int pageSize = calculateOptimalPageSize( mappedMemory, 60 /*pages*/ );
int pageSize = config.pageSize();
this.neo4jConfig = new Config( stringMap( dbConfig.getParams(),
dense_node_threshold.name(), valueOf( config.denseNodeThreshold() ),
pagecache_memory.name(), valueOf( mappedMemory ),
Expand Down Expand Up @@ -167,21 +165,6 @@ public BatchingNeoStores( FileSystemAbstraction fileSystem, File storeDir, Recor
HighestSelectionStrategy.getInstance() ).getLabelScanStore() );
}

static int calculateOptimalPageSize( long memorySize, int numberOfPages )
{
int pageSize = (int) mebiBytes( 8 );
int lowest = (int) kibiBytes( 8 );
while ( pageSize > lowest )
{
if ( memorySize / pageSize >= numberOfPages )
{
return pageSize;
}
pageSize >>>= 1;
}
return lowest;
}

private static PageCache createPageCache( FileSystemAbstraction fileSystem, Config config, LogProvider log,
PageCacheTracer tracer )
{
Expand Down
Expand Up @@ -30,6 +30,8 @@
import static org.junit.Assert.assertTrue;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.pagecache_memory;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.io.ByteUnit.kibiBytes;
import static org.neo4j.io.ByteUnit.mebiBytes;
import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit;
import static org.neo4j.unsafe.impl.batchimport.Configuration.MAX_PAGE_CACHE_MEMORY;

Expand Down Expand Up @@ -77,8 +79,72 @@ public void shouldParseDefaultPageCacheMemorySetting() throws Exception
assertTrue( within( memory, Config.defaults().get( pagecache_memory ), MAX_PAGE_CACHE_MEMORY ) );
}

@Test
public void shouldCalculateBigPageSizeForBiggerMemory() throws Exception
{
// GIVEN
Configuration config = configWithPageCacheMemory( mebiBytes( 240 ) );

// WHEN
int pageSize = config.pageSize();

// THEN
assertEquals( mebiBytes( 4 ), pageSize );
}

@Test
public void shouldCalculateSmallPageSizeForSmallerMemory() throws Exception
{
// GIVEN
Configuration config = configWithPageCacheMemory( mebiBytes( 100 ) );

// WHEN
int pageSize = config.pageSize();

// THEN
assertEquals( mebiBytes( 1 ), pageSize );
}

@Test
public void shouldNotGoLowerThan8kPageSizeForSmallMemory() throws Exception
{
// GIVEN
Configuration config = configWithPageCacheMemory( kibiBytes( 8*30 ) );

// WHEN
int pageSize = config.pageSize();

// THEN
assertEquals( kibiBytes( 8 ), pageSize );
}

@Test
public void shouldNotGoHigherThan8mPageSizeForBigMemory() throws Exception
{
// GIVEN
Configuration config = configWithPageCacheMemory( mebiBytes( 700 ) );

// WHEN
int pageSize = config.pageSize();

// THEN
assertEquals( mebiBytes( 8 ), pageSize );
}

private boolean within( long value, long firstBound, long otherBound )
{
return value >= min( firstBound, otherBound ) && value <= max( firstBound, otherBound );
}

private Configuration configWithPageCacheMemory( long bytes )
{
return new Configuration.Default()
{
@Override
public long pageCacheMemory()
{
return bytes;
}
};
}
}
Expand Up @@ -48,7 +48,6 @@
import static org.neo4j.io.ByteUnit.mebiBytes;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT;
import static org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores.calculateOptimalPageSize;

public class BatchingNeoStoresTest
{
Expand Down Expand Up @@ -104,58 +103,6 @@ public void shouldRespectDbConfig() throws Exception
}
}

@Test
public void shouldCalculateBigPageSizeForBiggerMemory() throws Exception
{
// GIVEN
long memorySize = mebiBytes( 240 );

// WHEN
int pageSize = calculateOptimalPageSize( memorySize, 60 );

// THEN
assertEquals( mebiBytes( 4 ), pageSize );
}

@Test
public void shouldCalculateSmallPageSizeForSmallerMemory() throws Exception
{
// GIVEN
long memorySize = mebiBytes( 100 );

// WHEN
int pageSize = calculateOptimalPageSize( memorySize, 60 );

// THEN
assertEquals( mebiBytes( 1 ), pageSize );
}

@Test
public void shouldNotGoLowerThan8kPageSizeForSmallMemory() throws Exception
{
// GIVEN
long memorySize = kibiBytes( 8*30 );

// WHEN
int pageSize = calculateOptimalPageSize( memorySize, 60 );

// THEN
assertEquals( kibiBytes( 8 ), pageSize );
}

@Test
public void shouldNotGoHigherThan8mPageSizeForBigMemory() throws Exception
{
// GIVEN
long memorySize = mebiBytes( 700 );

// WHEN
int pageSize = calculateOptimalPageSize( memorySize, 60 );

// THEN
assertEquals( mebiBytes( 8 ), pageSize );
}

private void someDataInTheDatabase()
{
GraphDatabaseService db = new TestGraphDatabaseFactory().setFileSystem( fsr.get() )
Expand Down

0 comments on commit 0f457f4

Please sign in to comment.