Skip to content

Commit

Permalink
Bigger page size during import
Browse files Browse the repository at this point in the history
to 8MiB instead of the default 8KiB. Amount mapped memory is set to
a low enough number of pages to be able to handle a couple of stores
updates during a single stage and leaving some room for the page cache
to maneuver. Also no longer explicitly flushes stores after each batch
due to the low number of pages.

The effects of this change are:
- configuration code simpler due to not needing to calculate memory size
  from batch size.
- less overhead spent in reading/writing pages, which improves performance
  of import overall since those steps usually being the bottlenecks.
- helps with a scenario during Relationship-->Relationship linkback stage
  where on Windows OS/FS memory consumption seemingly increasing slightly
  with each I/O access during this backward scan of the relationship store.
  Bigger page size means less I/O accesses and therefore heavily reducing
  this problem.

Overall this change have been seen to improve I/O access at least 30-50%
when doing an import.
  • Loading branch information
tinwelint committed Mar 7, 2016
1 parent 6442209 commit 4a887b2
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 90 deletions.
Expand Up @@ -75,6 +75,7 @@
import static org.neo4j.helpers.Exceptions.launderedException;
import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.helpers.Strings.TAB;
import static org.neo4j.io.ByteUnit.kibiBytes;
import static org.neo4j.kernel.impl.util.Converters.withDefault;
import static org.neo4j.unsafe.impl.batchimport.Configuration.BAD_FILE_NAME;
import static org.neo4j.unsafe.impl.batchimport.input.Collectors.badCollector;
Expand Down Expand Up @@ -496,12 +497,10 @@ private static org.neo4j.unsafe.impl.batchimport.Configuration importConfigurati
{
return new org.neo4j.unsafe.impl.batchimport.Configuration.Default()
{
private static final int WRITE_BUFFER_SIZE_FOR_TEST = 1024 * 1024 * 8; // 8 MiB

@Override
public int writeBufferSize()
public long pageSize()
{
return defaultSettingsSuitableForTests? WRITE_BUFFER_SIZE_FOR_TEST : super.writeBufferSize();
return defaultSettingsSuitableForTests ? kibiBytes( 32 ) : super.pageSize();
}

@Override
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;

import static java.lang.Math.round;
import static org.neo4j.io.ByteUnit.mebiBytes;

/**
* User controlled configuration for a {@link BatchImporter}.
Expand All @@ -36,45 +36,29 @@ public interface Configuration extends org.neo4j.unsafe.impl.batchimport.staging
String BAD_FILE_NAME = "bad.log";

/**
* Memory dedicated to buffering data to be written.
* @return number of relationships threshold for considering a node dense.
*/
int writeBufferSize();
int denseNodeThreshold();

/**
* The number of relationships threshold for considering a node dense.
* @return page size for the page cache managing the store.
*/
int denseNodeThreshold();
long pageSize();

class Default
extends org.neo4j.unsafe.impl.batchimport.staging.Configuration.Default
implements Configuration
{
private static final int DEFAULT_PAGE_SIZE = 1024 * 8;

@Override
public int batchSize()
{
return 10_000;
}

@Override
public int writeBufferSize()
{
// Do a little calculation here where the goal of the returned value is that if a file channel
// would be seen as a batch itself (think asynchronous writing) there would be created roughly
// as many as the other types of batches.
int averageRecordSize = 40; // Gut-feel estimate
int batchesToBuffer = 1000;
int maxWriteBufferSize = batchSize() * averageRecordSize * batchesToBuffer;
int writeBufferSize = (int) Math.min( maxWriteBufferSize, Runtime.getRuntime().maxMemory() / 5);
return roundToClosest( writeBufferSize, DEFAULT_PAGE_SIZE * 30 );
}

private int roundToClosest( int value, int divisible )
public long pageSize()
{
double roughCount = (double) value / divisible;
int count = (int) round( roughCount );
return divisible*count;
return mebiBytes( 8 );
}

@Override
Expand Down Expand Up @@ -112,9 +96,9 @@ public Overridden( Config config )
}

@Override
public int writeBufferSize()
public long pageSize()
{
return defaults.writeBufferSize();
return defaults.pageSize();
}

@Override
Expand Down
Expand Up @@ -39,7 +39,6 @@
import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor;

import static java.lang.Math.max;
import static org.neo4j.unsafe.impl.batchimport.Batch.EMPTY;

/**
* Writes {@link RECORD entity batches} to the underlying stores. Also makes final composition of the
Expand Down Expand Up @@ -141,8 +140,6 @@ protected void process( Batch<INPUT,RECORD> batch, BatchSender sender )

monitor.entitiesWritten( records[0].getClass(), records.length-skipped );
monitor.propertiesWritten( propertyBlockCursor );

sender.send( EMPTY ); // allow for the store flusher step right after us to execute
}

private void reassignDynamicRecordIds( PropertyBlock[] blocks, int offset, int length )
Expand Down
Expand Up @@ -63,6 +63,5 @@ public NodeStage( Configuration config, IoMonitor writeMonitor,
add( new LabelScanStorePopulationStep( control(), config, labelScanStore ) );
add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore,
writeMonitor, storeUpdateMonitor ) );
add( new StoreFlusherStep( control(), config, nodeStore, propertyStore ) );
}
}
Expand Up @@ -54,6 +54,5 @@ public RelationshipStage( Configuration config, IoMonitor writeMonitor,
neoStore.getRelationshipTypeRepository(), cache, specificIds ) );
add( new EntityStoreUpdaterStep<>( control(), config,
relationshipStore, propertyStore, writeMonitor, storeUpdateMonitor ) );
add( new StoreFlusherStep( control(), config, relationshipStore, propertyStore ) );
}
}

This file was deleted.

Expand Up @@ -64,10 +64,6 @@ record = (RECORD) record.clone();
}
update( record );
}

// Flush after each batch.
// We get vectored, sequential IO when we write with flush, plus it makes future page faulting faster.
store.flush();
recordsUpdated += batch.length;
}

Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.transaction.state.NeoStoresSupplier;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.impl.util.OsBeanUtil;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
Expand All @@ -58,9 +59,11 @@
import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingRelationshipTypeTokenRepository;
import org.neo4j.unsafe.impl.batchimport.store.io.IoTracer;

import static java.lang.Math.min;
import static java.lang.String.valueOf;

import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.mapped_memory_page_size;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.pagecache_memory;
import static org.neo4j.helpers.collection.MapUtil.stringMap;

Expand Down Expand Up @@ -89,9 +92,16 @@ public BatchingNeoStores( FileSystemAbstraction fileSystem, File storeDir, Confi
this.fileSystem = fileSystem;
this.logProvider = logService.getInternalLogProvider();
this.storeDir = storeDir;

long pageSize = config.pageSize();
// 30 is the minimum number of pages the page cache wants to keep free at all times.
// Having less than that might result in an evicted page will reading, which would mean
// unnecessary re-reading. Having slightly more leaves some leg room.
long optimalMappedMemorySize = pageSize * 40;
this.neo4jConfig = new Config( stringMap( dbConfig.getParams(),
dense_node_threshold.name(), valueOf( config.denseNodeThreshold() ),
pagecache_memory.name(), valueOf( config.writeBufferSize() ) ),
pagecache_memory.name(), valueOf( applyEnvironmentLimitationsTo( optimalMappedMemorySize ) ),
mapped_memory_page_size.name(), valueOf( pageSize ) ),
GraphDatabaseSettings.class );
final PageCacheTracer tracer = new DefaultPageCacheTracer();
this.pageCache = createPageCache( fileSystem, neo4jConfig, logProvider, tracer );
Expand Down Expand Up @@ -156,6 +166,27 @@ public File storeDir()
LabelScanStoreProvider.HIGHEST_PRIORITIZED ).getLabelScanStore() );
}

/**
* An attempt to limit amount of memory used by the page cache in a severely limited environment.
* This shouldn't be a problem in most scenarios since the optimal mapped memory size is in the range
* of 100-200 MiB and so shouldn't impose a noticeable dent in memory usage.
*
* @param optimalMappedMemorySize amount of mapped memory that would be considered optimal for the import.
* @return in most cases the optimal size, although in some very limited environments a smaller size.
*/
private long applyEnvironmentLimitationsTo( long optimalMappedMemorySize )
{
long freePhysicalMemory = OsBeanUtil.getFreePhysicalMemory();
if ( freePhysicalMemory == OsBeanUtil.VALUE_UNAVAILABLE )
{
// We have no idea how much free memory there is, let's simply go with what we'd like to have
return optimalMappedMemorySize;
}
// We got a hint about amount of free memory. Let's acquire tops a fifth of the free memory
// since other parts of the importer also needs memory to function.
return min( optimalMappedMemorySize, freePhysicalMemory / 5 );
}

private static PageCache createPageCache( FileSystemAbstraction fileSystem, Config config, LogProvider log,
PageCacheTracer tracer )
{
Expand Down

0 comments on commit 4a887b2

Please sign in to comment.