Skip to content

Commit

Permalink
PBI: Page cache backed NumberArrayFactory, first draft
Browse files Browse the repository at this point in the history
This adds a new NumberArrayFactory that builds number arrays that use the page
cache as backing memory. This is only used a the last fall-back for the
ChunkedNumberArrayFactory for allocating chunks, so it will be used as little
as possible. However, it will allow the importer to import graphs on systems
where for instance the IdMapper cannot fit in the available memory of the
system.

There is one outstanding issue in this commit, and that is that the
NumberArrays can be exposed to concurrent access from multiple threads. The
PageCache backed NumberArray implementations are currently single-threaded,
becuase they reuse the same pre-allocated PageCursors over and over again. If
the array is accessed concurrently, then this can cause a cursor to move while
another thread is reading from the pinned page.
  • Loading branch information
chrisvest committed Jul 14, 2017
1 parent 9599051 commit 7d29f1b
Show file tree
Hide file tree
Showing 43 changed files with 1,132 additions and 385 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.unsafe.impl.batchimport.IdRangeInput.Range;
import org.neo4j.unsafe.impl.batchimport.InputIterable;
import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
Expand Down Expand Up @@ -122,9 +123,9 @@ public boolean supportsMultiplePasses()
}

@Override
public IdMapper idMapper()
public IdMapper idMapper( NumberArrayFactory numberArrayFactory )
{
return idType.idMapper();
return idType.idMapper( numberArrayFactory );
}

@Override
Expand Down
Expand Up @@ -28,11 +28,13 @@
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;

import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO;
import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.superviseDynamicExecution;

public class CountsComputer implements DataInitializer<CountsAccessor.Updater>
{

private final NumberArrayFactory numberArrayFactory;

public static void recomputeCounts( NeoStores stores )
{
MetaDataStore metaDataStore = stores.getMetaDataStore();
Expand All @@ -52,26 +54,27 @@ public static void recomputeCounts( NeoStores stores )
public CountsComputer( NeoStores stores )
{
this( stores.getMetaDataStore().getLastCommittedTransactionId(),
stores.getNodeStore(), stores.getRelationshipStore(),
(int) stores.getLabelTokenStore().getHighId(),
(int) stores.getRelationshipTypeTokenStore().getHighId() );
stores.getNodeStore(), stores.getRelationshipStore(),
(int) stores.getLabelTokenStore().getHighId(),
(int) stores.getRelationshipTypeTokenStore().getHighId(),
NumberArrayFactory.autoWithPageCacheFallback( stores.getPageCache(), stores.getStoreDir() ) );
}

public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, RelationshipStore relationships,
int highLabelId,
int highRelationshipTypeId )
int highLabelId, int highRelationshipTypeId, NumberArrayFactory numberArrayFactory )
{
this.lastCommittedTransactionId = lastCommittedTransactionId;
this.nodes = nodes;
this.relationships = relationships;
this.highLabelId = highLabelId;
this.highRelationshipTypeId = highRelationshipTypeId;
this.numberArrayFactory = numberArrayFactory;
}

@Override
public void initialize( CountsAccessor.Updater countsUpdater )
{
NodeLabelsCache cache = new NodeLabelsCache( NumberArrayFactory.AUTO, highLabelId );
NodeLabelsCache cache = new NodeLabelsCache( numberArrayFactory, highLabelId );
try
{
// Count nodes
Expand All @@ -80,7 +83,7 @@ public void initialize( CountsAccessor.Updater countsUpdater )
// Count relationships
superviseDynamicExecution(
new RelationshipCountsStage( Configuration.DEFAULT, cache, relationships, highLabelId,
highRelationshipTypeId, countsUpdater, AUTO ) );
highRelationshipTypeId, countsUpdater, numberArrayFactory ) );
}
finally
{
Expand Down
Expand Up @@ -151,6 +151,11 @@ public File getStoreDir()
return storeDir;
}

public PageCache getPageCache()
{
return pageCache;
}

private File getStoreFile( String substoreName )
{
return new File( neoStoreFileName.getPath() + substoreName );
Expand Down
Expand Up @@ -102,6 +102,7 @@
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.InputIterable;
import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerators;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMappers;
import org.neo4j.unsafe.impl.batchimport.input.Collectors;
Expand Down Expand Up @@ -421,7 +422,8 @@ private void rebuildCountsFromScratch( File storeDir, long lastTxId, String vers
int highLabelId = (int) neoStores.getLabelTokenStore().getHighId();
int highRelationshipTypeId = (int) neoStores.getRelationshipTypeTokenStore().getHighId();
CountsComputer initializer = new CountsComputer(
lastTxId, nodeStore, relationshipStore, highLabelId, highRelationshipTypeId );
lastTxId, nodeStore, relationshipStore, highLabelId, highRelationshipTypeId,
NumberArrayFactory.autoWithPageCacheFallback( pageCache, storeDir ) );
life.add( new CountsTracker(
logService.getInternalLogProvider(), fileSystem, pageCache, config, storeFileBase )
.setInitializer( initializer ) );
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeType;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
Expand All @@ -66,11 +67,9 @@
import static java.lang.Long.max;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;

import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure;
import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN;
import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.superviseExecution;
import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.withDynamicProcessorAssignment;
Expand Down Expand Up @@ -169,12 +168,14 @@ public void doImport( Input input ) throws IOException
neoStore.getLastCommittedTransactionId() );
InputCache inputCache = new InputCache( fileSystem, storeDir, recordFormats, config ) )
{
NumberArrayFactory numberArrayFactory =
NumberArrayFactory.autoWithPageCacheFallback( neoStore.getPageCache(), storeDir );
Collector badCollector = input.badCollector();
// Some temporary caches and indexes in the import
IoMonitor writeMonitor = new IoMonitor( neoStore.getIoTracer() );
IdMapper idMapper = input.idMapper();
IdMapper idMapper = input.idMapper( numberArrayFactory );
IdGenerator idGenerator = input.idGenerator();
nodeRelationshipCache = new NodeRelationshipCache( AUTO, config.denseNodeThreshold() );
nodeRelationshipCache = new NodeRelationshipCache( numberArrayFactory, config.denseNodeThreshold() );
StatsProvider memoryUsageStats = new MemoryUsageStatsProvider( nodeRelationshipCache, idMapper );
InputIterable<InputNode> nodes = input.nodes();
InputIterable<InputRelationship> relationships = input.relationships();
Expand Down Expand Up @@ -225,18 +226,19 @@ public void doImport( Input input ) throws IOException
nodeRelationshipCache = null;

// Defragment relationships groups for better performance
new RelationshipGroupDefragmenter( config, executionMonitor ).run( max( maxMemory, peakMemoryUsage ),
neoStore, highNodeId );
RelationshipGroupDefragmenter groupDefragmenter =
new RelationshipGroupDefragmenter( config, executionMonitor, numberArrayFactory );
groupDefragmenter.run( max( maxMemory, peakMemoryUsage ), neoStore, highNodeId );

// Count nodes per label and labels per node
nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() );
nodeLabelsCache = new NodeLabelsCache( numberArrayFactory, neoStore.getLabelRepository().getHighId() );
memoryUsageStats = new MemoryUsageStatsProvider( nodeLabelsCache );
executeStage( new NodeCountsStage( config, nodeLabelsCache, neoStore.getNodeStore(),
neoStore.getLabelRepository().getHighId(), countsUpdater, memoryUsageStats ) );
// Count label-[type]->label
executeStage( new RelationshipCountsStage( config, nodeLabelsCache, relationshipStore,
neoStore.getLabelRepository().getHighId(),
neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, AUTO ) );
neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, numberArrayFactory ) );

// We're done, do some final logging about it
long totalTimeMillis = currentTimeMillis() - startTime;
Expand Down
Expand Up @@ -28,9 +28,7 @@
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;

import static java.lang.Long.max;

import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO;

/**
* Holds information vital for making {@link RelationshipGroupDefragmenter} work the way it does.
Expand All @@ -54,7 +52,7 @@ public class RelationshipGroupCache implements Iterable<RelationshipGroupRecord>
private final ByteArray groupCountCache;
private final ByteArray cache;
private final long highNodeId;
private final LongArray offsets = AUTO.newDynamicLongArray( 100_000, 0 );
private final LongArray offsets;
private final byte[] scratch = new byte[GROUP_ENTRY_SIZE];
private long fromNodeId;
private long toNodeId;
Expand All @@ -63,6 +61,7 @@ public class RelationshipGroupCache implements Iterable<RelationshipGroupRecord>

public RelationshipGroupCache( NumberArrayFactory arrayFactory, long maxMemory, long highNodeId )
{
this.offsets = arrayFactory.newDynamicLongArray( 100_000, 0 );
this.groupCountCache = arrayFactory.newByteArray( highNodeId, new byte[2] );
this.highNodeId = highNodeId;

Expand Down
Expand Up @@ -23,12 +23,12 @@
import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.cache.ByteArray;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;

import static org.neo4j.unsafe.impl.batchimport.Configuration.withBatchSize;
import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO;
import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.superviseExecution;

/**
Expand All @@ -43,6 +43,9 @@
*/
public class RelationshipGroupDefragmenter
{

private final NumberArrayFactory numberArrayFactory;

public interface Monitor
{
/**
Expand All @@ -65,22 +68,25 @@ default void defragmentingNodeRange( long fromNodeId, long toNodeId )
private final ExecutionMonitor executionMonitor;
private final Monitor monitor;

public RelationshipGroupDefragmenter( Configuration config, ExecutionMonitor executionMonitor )
public RelationshipGroupDefragmenter( Configuration config, ExecutionMonitor executionMonitor,
NumberArrayFactory numberArrayFactory )
{
this( config, executionMonitor, Monitor.EMPTY );
this( config, executionMonitor, Monitor.EMPTY, numberArrayFactory );
}

public RelationshipGroupDefragmenter( Configuration config, ExecutionMonitor executionMonitor, Monitor monitor )
public RelationshipGroupDefragmenter( Configuration config, ExecutionMonitor executionMonitor, Monitor monitor,
NumberArrayFactory numberArrayFactory )
{
this.config = config;
this.executionMonitor = executionMonitor;
this.monitor = monitor;
this.numberArrayFactory = numberArrayFactory;
}

public void run( long memoryWeCanHoldForCertain, BatchingNeoStores neoStore, long highNodeId )
{
try ( RelationshipGroupCache groupCache =
new RelationshipGroupCache( AUTO, memoryWeCanHoldForCertain, highNodeId ) )
new RelationshipGroupCache( numberArrayFactory, memoryWeCanHoldForCertain, highNodeId ) )
{
// Read from the temporary relationship group store...
RecordStore<RelationshipGroupRecord> fromStore = neoStore.getTemporaryRelationshipGroupStore();
Expand Down
@@ -0,0 +1,110 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.cache;

import static java.lang.Long.min;

/**
* Used as part of the fallback strategy for {@link Auto}. Tries to split up fixed-size arrays
* ({@link #newLongArray(long, long)} and {@link #newIntArray(long, int)} into smaller chunks where
* some can live on heap and some off heap.
*/
public class ChunkedNumberArrayFactory extends NumberArrayFactory.Adapter
{
private final NumberArrayFactory delegate;

public ChunkedNumberArrayFactory()
{
this( OFF_HEAP, HEAP );
}

public ChunkedNumberArrayFactory( NumberArrayFactory... delegateList )
{
delegate = new Auto( delegateList );
}

@Override
public LongArray newLongArray( long length, long defaultValue, long base )
{
// Here we want to have the property of a dynamic array which makes some parts of the array
// live on heap, some off. At the same time we want a fixed size array. Therefore first create
// the array as a dynamic array and make it grow to the requested length.
LongArray array = newDynamicLongArray( fractionOf( length ), defaultValue );
array.at( length - 1 );
return array;
}

@Override
public IntArray newIntArray( long length, int defaultValue, long base )
{
// Here we want to have the property of a dynamic array which makes some parts of the array
// live on heap, some off. At the same time we want a fixed size array. Therefore first create
// the array as a dynamic array and make it grow to the requested length.
IntArray array = newDynamicIntArray( fractionOf( length ), defaultValue );
array.at( length - 1 );
return array;
}

@Override
public ByteArray newByteArray( long length, byte[] defaultValue, long base )
{
// Here we want to have the property of a dynamic array which makes some parts of the array
// live on heap, some off. At the same time we want a fixed size array. Therefore first create
// the array as a dynamic array and make it grow to the requested length.
ByteArray array = newDynamicByteArray( fractionOf( length ), defaultValue );
array.at( length - 1 );
return array;
}

private long fractionOf( long length )
{
int idealChunkCount = 10;
if ( length < idealChunkCount )
{
return length;
}
int maxArraySize = Integer.MAX_VALUE - Short.MAX_VALUE;
return min( length / idealChunkCount, maxArraySize );
}

@Override
public IntArray newDynamicIntArray( long chunkSize, int defaultValue )
{
return new DynamicIntArray( delegate, chunkSize, defaultValue );
}

@Override
public LongArray newDynamicLongArray( long chunkSize, long defaultValue )
{
return new DynamicLongArray( delegate, chunkSize, defaultValue );
}

@Override
public ByteArray newDynamicByteArray( long chunkSize, byte[] defaultValue )
{
return new DynamicByteArray( delegate, chunkSize, defaultValue );
}

@Override
public String toString()
{
return "CHUNKED_FIXED_SIZE";
}
}
Expand Up @@ -37,18 +37,14 @@ public DynamicByteArray( NumberArrayFactory factory, long chunkSize, byte[] defa
}

@Override
public void swap( long fromIndex, long toIndex, int numberOfEntries )
{
// Let's just do this the stupid way. There's room for optimization here
byte[] intermediary = defaultValue.clone();
byte[] transport = defaultValue.clone();
for ( int i = 0; i < numberOfEntries; i++ )
{
get( fromIndex + i, intermediary );
get( toIndex + i, transport );
set( fromIndex + i, transport );
set( toIndex + i, intermediary );
}
public void swap( long fromIndex, long toIndex )
{
byte[] a = defaultValue.clone();
byte[] b = defaultValue.clone();
get( fromIndex, a );
get( toIndex, b );
set( fromIndex, b );
set( toIndex, a );
}

@Override
Expand Down

0 comments on commit 7d29f1b

Please sign in to comment.