From 7d29f1b2ca7c2b000c0d03a66cbfade2cda0899e Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 14 Jul 2017 14:58:36 +0200 Subject: [PATCH] PBI: Page cache backed NumberArrayFactory, first draft This adds a new NumberArrayFactory that builds number arrays that use the page cache as backing memory. This is only used a the last fall-back for the ChunkedNumberArrayFactory for allocating chunks, so it will be used as little as possible. However, it will allow the importer to import graphs on systems where for instance the IdMapper cannot fit in the available memory of the system. There is one outstanding issue in this commit, and that is that the NumberArrays can be exposed to concurrent access from multiple threads. The PageCache backed NumberArray implementations are currently single-threaded, becuase they reuse the same pre-allocated PageCursors over and over again. If the array is accessed concurrently, then this can cause a cursor to move while another thread is reading from the pinned page. --- .../org/neo4j/tooling/DataGeneratorInput.java | 5 +- .../kernel/impl/store/CountsComputer.java | 19 +- .../neo4j/kernel/impl/store/NeoStores.java | 5 + .../participant/StoreMigrator.java | 4 +- .../batchimport/ParallelBatchImporter.java | 18 +- .../batchimport/RelationshipGroupCache.java | 5 +- .../RelationshipGroupDefragmenter.java | 16 +- .../cache/ChunkedNumberArrayFactory.java | 110 ++++++ .../batchimport/cache/DynamicByteArray.java | 20 +- .../impl/batchimport/cache/HeapByteArray.java | 14 +- .../impl/batchimport/cache/HeapIntArray.java | 11 - .../impl/batchimport/cache/HeapLongArray.java | 11 - .../impl/batchimport/cache/IntArray.java | 12 +- .../impl/batchimport/cache/LongArray.java | 12 +- .../impl/batchimport/cache/NumberArray.java | 6 +- .../batchimport/cache/NumberArrayFactory.java | 81 +--- .../batchimport/cache/OffHeapByteArray.java | 11 +- .../batchimport/cache/OffHeapIntArray.java | 14 - .../batchimport/cache/OffHeapLongArray.java | 14 - .../batchimport/cache/PageCacheByteArray.java | 351 ++++++++++++++++++ .../batchimport/cache/PageCacheIntArray.java | 44 +-- .../batchimport/cache/PageCacheLongArray.java | 44 +-- .../cache/PageCacheNumberArray.java | 138 +++++-- .../cache/PageCachedNumberArrayFactory.java | 87 +++++ .../idmapping/string/AbstractTracker.java | 4 +- .../idmapping/string/EncodingIdMapper.java | 2 +- .../cache/idmapping/string/ParallelSort.java | 6 +- .../cache/idmapping/string/Tracker.java | 3 +- .../unsafe/impl/batchimport/input/Input.java | 4 +- .../unsafe/impl/batchimport/input/Inputs.java | 3 +- .../impl/batchimport/input/csv/CsvInput.java | 5 +- .../impl/batchimport/input/csv/IdType.java | 15 +- .../batchimport/store/BatchingNeoStores.java | 5 + .../impl/store/counts/CountsComputerTest.java | 4 +- .../RelationshipGroupDefragmenterTest.java | 3 +- .../impl/batchimport/cache/ByteArrayTest.java | 119 +++++- .../impl/batchimport/cache/IntArrayTest.java | 58 ++- .../impl/batchimport/cache/LongArrayTest.java | 47 ++- .../NumberArrayPageCacheTestSupport.java | 63 ++++ .../batchimport/cache/NumberArrayTest.java | 54 ++- .../cache/PageCacheLongArrayTest.java | 64 ++-- .../MultipleIndexPopulationStressIT.java | 3 +- .../checkpoint/NodeCountInputs.java | 3 +- 43 files changed, 1132 insertions(+), 385 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/ChunkedNumberArrayFactory.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheByteArray.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCachedNumberArrayFactory.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayPageCacheTestSupport.java diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/DataGeneratorInput.java b/community/import-tool/src/test/java/org/neo4j/tooling/DataGeneratorInput.java index 3c46c52d011a2..894e72a6e8061 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/DataGeneratorInput.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/DataGeneratorInput.java @@ -24,6 +24,7 @@ import org.neo4j.unsafe.impl.batchimport.IdRangeInput.Range; import org.neo4j.unsafe.impl.batchimport.InputIterable; import org.neo4j.unsafe.impl.batchimport.InputIterator; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.Collector; @@ -122,9 +123,9 @@ public boolean supportsMultiplePasses() } @Override - public IdMapper idMapper() + public IdMapper idMapper( NumberArrayFactory numberArrayFactory ) { - return idType.idMapper(); + return idType.idMapper( numberArrayFactory ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CountsComputer.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CountsComputer.java index 46f7941a93d5d..f76ddf1c85ba7 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CountsComputer.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CountsComputer.java @@ -28,11 +28,13 @@ import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache; import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; -import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.superviseDynamicExecution; public class CountsComputer implements DataInitializer { + + private final NumberArrayFactory numberArrayFactory; + public static void recomputeCounts( NeoStores stores ) { MetaDataStore metaDataStore = stores.getMetaDataStore(); @@ -52,26 +54,27 @@ public static void recomputeCounts( NeoStores stores ) public CountsComputer( NeoStores stores ) { this( stores.getMetaDataStore().getLastCommittedTransactionId(), - stores.getNodeStore(), stores.getRelationshipStore(), - (int) stores.getLabelTokenStore().getHighId(), - (int) stores.getRelationshipTypeTokenStore().getHighId() ); + stores.getNodeStore(), stores.getRelationshipStore(), + (int) stores.getLabelTokenStore().getHighId(), + (int) stores.getRelationshipTypeTokenStore().getHighId(), + NumberArrayFactory.autoWithPageCacheFallback( stores.getPageCache(), stores.getStoreDir() ) ); } public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, RelationshipStore relationships, - int highLabelId, - int highRelationshipTypeId ) + int highLabelId, int highRelationshipTypeId, NumberArrayFactory numberArrayFactory ) { this.lastCommittedTransactionId = lastCommittedTransactionId; this.nodes = nodes; this.relationships = relationships; this.highLabelId = highLabelId; this.highRelationshipTypeId = highRelationshipTypeId; + this.numberArrayFactory = numberArrayFactory; } @Override public void initialize( CountsAccessor.Updater countsUpdater ) { - NodeLabelsCache cache = new NodeLabelsCache( NumberArrayFactory.AUTO, highLabelId ); + NodeLabelsCache cache = new NodeLabelsCache( numberArrayFactory, highLabelId ); try { // Count nodes @@ -80,7 +83,7 @@ public void initialize( CountsAccessor.Updater countsUpdater ) // Count relationships superviseDynamicExecution( new RelationshipCountsStage( Configuration.DEFAULT, cache, relationships, highLabelId, - highRelationshipTypeId, countsUpdater, AUTO ) ); + highRelationshipTypeId, countsUpdater, numberArrayFactory ) ); } finally { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/NeoStores.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/NeoStores.java index ef461100f4b85..d884afbee703e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/NeoStores.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/NeoStores.java @@ -151,6 +151,11 @@ public File getStoreDir() return storeDir; } + public PageCache getPageCache() + { + return pageCache; + } + private File getStoreFile( String substoreName ) { return new File( neoStoreFileName.getPath() + substoreName ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java index d59e35d99ed7c..29c1e468b3cdc 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java @@ -102,6 +102,7 @@ import org.neo4j.unsafe.impl.batchimport.Configuration; import org.neo4j.unsafe.impl.batchimport.InputIterable; import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerators; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMappers; import org.neo4j.unsafe.impl.batchimport.input.Collectors; @@ -421,7 +422,8 @@ private void rebuildCountsFromScratch( File storeDir, long lastTxId, String vers int highLabelId = (int) neoStores.getLabelTokenStore().getHighId(); int highRelationshipTypeId = (int) neoStores.getRelationshipTypeTokenStore().getHighId(); CountsComputer initializer = new CountsComputer( - lastTxId, nodeStore, relationshipStore, highLabelId, highRelationshipTypeId ); + lastTxId, nodeStore, relationshipStore, highLabelId, highRelationshipTypeId, + NumberArrayFactory.autoWithPageCacheFallback( pageCache, storeDir ) ); life.add( new CountsTracker( logService.getInternalLogProvider(), fileSystem, pageCache, config, storeFileBase ) .setInitializer( initializer ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index 4e36be3464269..a5e336a16019a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -48,6 +48,7 @@ import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.cache.NodeType; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.Collector; @@ -66,11 +67,9 @@ import static java.lang.Long.max; import static java.lang.String.format; import static java.lang.System.currentTimeMillis; - import static org.neo4j.helpers.Format.bytes; import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY; import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure; -import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN; import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.superviseExecution; import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.withDynamicProcessorAssignment; @@ -169,12 +168,14 @@ public void doImport( Input input ) throws IOException neoStore.getLastCommittedTransactionId() ); InputCache inputCache = new InputCache( fileSystem, storeDir, recordFormats, config ) ) { + NumberArrayFactory numberArrayFactory = + NumberArrayFactory.autoWithPageCacheFallback( neoStore.getPageCache(), storeDir ); Collector badCollector = input.badCollector(); // Some temporary caches and indexes in the import IoMonitor writeMonitor = new IoMonitor( neoStore.getIoTracer() ); - IdMapper idMapper = input.idMapper(); + IdMapper idMapper = input.idMapper( numberArrayFactory ); IdGenerator idGenerator = input.idGenerator(); - nodeRelationshipCache = new NodeRelationshipCache( AUTO, config.denseNodeThreshold() ); + nodeRelationshipCache = new NodeRelationshipCache( numberArrayFactory, config.denseNodeThreshold() ); StatsProvider memoryUsageStats = new MemoryUsageStatsProvider( nodeRelationshipCache, idMapper ); InputIterable nodes = input.nodes(); InputIterable relationships = input.relationships(); @@ -225,18 +226,19 @@ public void doImport( Input input ) throws IOException nodeRelationshipCache = null; // Defragment relationships groups for better performance - new RelationshipGroupDefragmenter( config, executionMonitor ).run( max( maxMemory, peakMemoryUsage ), - neoStore, highNodeId ); + RelationshipGroupDefragmenter groupDefragmenter = + new RelationshipGroupDefragmenter( config, executionMonitor, numberArrayFactory ); + groupDefragmenter.run( max( maxMemory, peakMemoryUsage ), neoStore, highNodeId ); // Count nodes per label and labels per node - nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() ); + nodeLabelsCache = new NodeLabelsCache( numberArrayFactory, neoStore.getLabelRepository().getHighId() ); memoryUsageStats = new MemoryUsageStatsProvider( nodeLabelsCache ); executeStage( new NodeCountsStage( config, nodeLabelsCache, neoStore.getNodeStore(), neoStore.getLabelRepository().getHighId(), countsUpdater, memoryUsageStats ) ); // Count label-[type]->label executeStage( new RelationshipCountsStage( config, nodeLabelsCache, relationshipStore, neoStore.getLabelRepository().getHighId(), - neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, AUTO ) ); + neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, numberArrayFactory ) ); // We're done, do some final logging about it long totalTimeMillis = currentTimeMillis() - startTime; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupCache.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupCache.java index 39d2f69380622..cd4421ce09f0b 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupCache.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupCache.java @@ -28,9 +28,7 @@ import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import static java.lang.Long.max; - import static org.neo4j.helpers.Format.bytes; -import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; /** * Holds information vital for making {@link RelationshipGroupDefragmenter} work the way it does. @@ -54,7 +52,7 @@ public class RelationshipGroupCache implements Iterable private final ByteArray groupCountCache; private final ByteArray cache; private final long highNodeId; - private final LongArray offsets = AUTO.newDynamicLongArray( 100_000, 0 ); + private final LongArray offsets; private final byte[] scratch = new byte[GROUP_ENTRY_SIZE]; private long fromNodeId; private long toNodeId; @@ -63,6 +61,7 @@ public class RelationshipGroupCache implements Iterable public RelationshipGroupCache( NumberArrayFactory arrayFactory, long maxMemory, long highNodeId ) { + this.offsets = arrayFactory.newDynamicLongArray( 100_000, 0 ); this.groupCountCache = arrayFactory.newByteArray( highNodeId, new byte[2] ); this.highNodeId = highNodeId; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenter.java index 8be6b78da3875..4459c9c752e63 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenter.java @@ -23,12 +23,12 @@ import org.neo4j.kernel.impl.store.record.Record; import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; import org.neo4j.unsafe.impl.batchimport.cache.ByteArray; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor; import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; import static org.neo4j.unsafe.impl.batchimport.Configuration.withBatchSize; -import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.superviseExecution; /** @@ -43,6 +43,9 @@ */ public class RelationshipGroupDefragmenter { + + private final NumberArrayFactory numberArrayFactory; + public interface Monitor { /** @@ -65,22 +68,25 @@ default void defragmentingNodeRange( long fromNodeId, long toNodeId ) private final ExecutionMonitor executionMonitor; private final Monitor monitor; - public RelationshipGroupDefragmenter( Configuration config, ExecutionMonitor executionMonitor ) + public RelationshipGroupDefragmenter( Configuration config, ExecutionMonitor executionMonitor, + NumberArrayFactory numberArrayFactory ) { - this( config, executionMonitor, Monitor.EMPTY ); + this( config, executionMonitor, Monitor.EMPTY, numberArrayFactory ); } - public RelationshipGroupDefragmenter( Configuration config, ExecutionMonitor executionMonitor, Monitor monitor ) + public RelationshipGroupDefragmenter( Configuration config, ExecutionMonitor executionMonitor, Monitor monitor, + NumberArrayFactory numberArrayFactory ) { this.config = config; this.executionMonitor = executionMonitor; this.monitor = monitor; + this.numberArrayFactory = numberArrayFactory; } public void run( long memoryWeCanHoldForCertain, BatchingNeoStores neoStore, long highNodeId ) { try ( RelationshipGroupCache groupCache = - new RelationshipGroupCache( AUTO, memoryWeCanHoldForCertain, highNodeId ) ) + new RelationshipGroupCache( numberArrayFactory, memoryWeCanHoldForCertain, highNodeId ) ) { // Read from the temporary relationship group store... RecordStore fromStore = neoStore.getTemporaryRelationshipGroupStore(); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/ChunkedNumberArrayFactory.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/ChunkedNumberArrayFactory.java new file mode 100644 index 0000000000000..045c05587f5f0 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/ChunkedNumberArrayFactory.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.cache; + +import static java.lang.Long.min; + +/** + * Used as part of the fallback strategy for {@link Auto}. Tries to split up fixed-size arrays + * ({@link #newLongArray(long, long)} and {@link #newIntArray(long, int)} into smaller chunks where + * some can live on heap and some off heap. + */ +public class ChunkedNumberArrayFactory extends NumberArrayFactory.Adapter +{ + private final NumberArrayFactory delegate; + + public ChunkedNumberArrayFactory() + { + this( OFF_HEAP, HEAP ); + } + + public ChunkedNumberArrayFactory( NumberArrayFactory... delegateList ) + { + delegate = new Auto( delegateList ); + } + + @Override + public LongArray newLongArray( long length, long defaultValue, long base ) + { + // Here we want to have the property of a dynamic array which makes some parts of the array + // live on heap, some off. At the same time we want a fixed size array. Therefore first create + // the array as a dynamic array and make it grow to the requested length. + LongArray array = newDynamicLongArray( fractionOf( length ), defaultValue ); + array.at( length - 1 ); + return array; + } + + @Override + public IntArray newIntArray( long length, int defaultValue, long base ) + { + // Here we want to have the property of a dynamic array which makes some parts of the array + // live on heap, some off. At the same time we want a fixed size array. Therefore first create + // the array as a dynamic array and make it grow to the requested length. + IntArray array = newDynamicIntArray( fractionOf( length ), defaultValue ); + array.at( length - 1 ); + return array; + } + + @Override + public ByteArray newByteArray( long length, byte[] defaultValue, long base ) + { + // Here we want to have the property of a dynamic array which makes some parts of the array + // live on heap, some off. At the same time we want a fixed size array. Therefore first create + // the array as a dynamic array and make it grow to the requested length. + ByteArray array = newDynamicByteArray( fractionOf( length ), defaultValue ); + array.at( length - 1 ); + return array; + } + + private long fractionOf( long length ) + { + int idealChunkCount = 10; + if ( length < idealChunkCount ) + { + return length; + } + int maxArraySize = Integer.MAX_VALUE - Short.MAX_VALUE; + return min( length / idealChunkCount, maxArraySize ); + } + + @Override + public IntArray newDynamicIntArray( long chunkSize, int defaultValue ) + { + return new DynamicIntArray( delegate, chunkSize, defaultValue ); + } + + @Override + public LongArray newDynamicLongArray( long chunkSize, long defaultValue ) + { + return new DynamicLongArray( delegate, chunkSize, defaultValue ); + } + + @Override + public ByteArray newDynamicByteArray( long chunkSize, byte[] defaultValue ) + { + return new DynamicByteArray( delegate, chunkSize, defaultValue ); + } + + @Override + public String toString() + { + return "CHUNKED_FIXED_SIZE"; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/DynamicByteArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/DynamicByteArray.java index 5307c37f37b0f..1b650f1e1a740 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/DynamicByteArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/DynamicByteArray.java @@ -37,18 +37,14 @@ public DynamicByteArray( NumberArrayFactory factory, long chunkSize, byte[] defa } @Override - public void swap( long fromIndex, long toIndex, int numberOfEntries ) - { - // Let's just do this the stupid way. There's room for optimization here - byte[] intermediary = defaultValue.clone(); - byte[] transport = defaultValue.clone(); - for ( int i = 0; i < numberOfEntries; i++ ) - { - get( fromIndex + i, intermediary ); - get( toIndex + i, transport ); - set( fromIndex + i, transport ); - set( toIndex + i, intermediary ); - } + public void swap( long fromIndex, long toIndex ) + { + byte[] a = defaultValue.clone(); + byte[] b = defaultValue.clone(); + get( fromIndex, a ); + get( toIndex, b ); + set( fromIndex, b ); + set( toIndex, a ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapByteArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapByteArray.java index 7c0f74ef8818f..17a2f32c6fd1e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapByteArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapByteArray.java @@ -50,12 +50,14 @@ public long length() } @Override - public void swap( long fromIndex, long toIndex, int numberOfEntries ) - { - byte[] intermediary = new byte[numberOfEntries * itemSize]; - System.arraycopy( array, index( toIndex, 0 ), intermediary, 0, intermediary.length ); - System.arraycopy( array, index( fromIndex, 0 ), array, index( toIndex, 0 ), intermediary.length ); - System.arraycopy( intermediary, 0, array, index( fromIndex, 0 ), intermediary.length ); + public void swap( long fromIndex, long toIndex ) + { + byte[] a = defaultValue.clone(); + byte[] b = defaultValue.clone(); + get( fromIndex, a ); + get( toIndex, b ); + set( fromIndex, b ); + set( toIndex, a ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapIntArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapIntArray.java index 227645ffe66cf..2edb4d3d4bd01 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapIntArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapIntArray.java @@ -60,15 +60,4 @@ public void clear() { Arrays.fill( array, defaultValue ); } - - @Override - public void swap( long fromIndex, long toIndex, int numberOfEntries ) - { - for ( int i = 0; i < numberOfEntries; i++ ) - { - int fromValue = get( fromIndex + i ); - set( fromIndex + i, get( toIndex + i ) ); - set( toIndex + i, fromValue ); - } - } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapLongArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapLongArray.java index 99d55235ee3d0..1062e515ee341 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapLongArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/HeapLongArray.java @@ -60,15 +60,4 @@ public void clear() { Arrays.fill( array, defaultValue ); } - - @Override - public void swap( long fromIndex, long toIndex, int numberOfEntries ) - { - for ( int i = 0; i < numberOfEntries; i++ ) - { - long fromValue = get( fromIndex + i ); - set( fromIndex + i, get( toIndex + i ) ); - set( toIndex + i, fromValue ); - } - } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/IntArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/IntArray.java index b5283e5c761bf..7f3105d2dd5cd 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/IntArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/IntArray.java @@ -32,14 +32,10 @@ public interface IntArray extends NumberArray void set( long index, int value ); @Override - default void swap( long fromIndex, long toIndex, int numberOfEntries ) + default void swap( long fromIndex, long toIndex ) { - // Let's just do this the stupid way. There's room for optimization here - for ( int i = 0; i < numberOfEntries; i++ ) - { - int intermediary = get( fromIndex+i ); - set( fromIndex+i, get( toIndex+i ) ); - set( toIndex+i, intermediary ); - } + int intermediary = get( fromIndex ); + set( fromIndex, get( toIndex ) ); + set( toIndex, intermediary ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/LongArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/LongArray.java index bfae1c6e1bc31..a86666235b60b 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/LongArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/LongArray.java @@ -32,14 +32,10 @@ public interface LongArray extends NumberArray void set( long index, long value ); @Override - default void swap( long fromIndex, long toIndex, int numberOfEntries ) + default void swap( long fromIndex, long toIndex ) { - // Let's just do this the stupid way. There's room for optimization here - for ( int i = 0; i < numberOfEntries; i++ ) - { - long intermediary = get( fromIndex+i ); - set( fromIndex+i, get( toIndex+i ) ); - set( toIndex+i, intermediary ); - } + long intermediary = get( fromIndex ); + set( fromIndex, get( toIndex ) ); + set( toIndex, intermediary ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArray.java index 7d007645d0385..75bd9eeec91fe 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArray.java @@ -34,12 +34,10 @@ public interface NumberArray> extends MemoryStatsVisito /** * Swaps {@code numberOfEntries} items from {@code fromIndex} to {@code toIndex}, such that * {@code fromIndex} and {@code toIndex}, {@code fromIndex+1} and {@code toIndex} a.s.o swaps places. - * - * @param fromIndex where to start swapping from. + * @param fromIndex where to start swapping from. * @param toIndex where to start swapping to. - * @param numberOfEntries number of entries to swap, starting from the given from/to indexes. */ - void swap( long fromIndex, long toIndex, int numberOfEntries ); + void swap( long fromIndex, long toIndex ); /** * Sets all values to a default value. diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayFactory.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayFactory.java index 40756391851e4..423b94ff21b86 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayFactory.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayFactory.java @@ -19,12 +19,12 @@ */ package org.neo4j.unsafe.impl.batchimport.cache; +import java.io.File; import java.util.Arrays; import org.neo4j.helpers.Exceptions; +import org.neo4j.io.pagecache.PageCache; -import static java.lang.Long.min; -import static java.lang.Math.toIntExact; import static java.lang.String.format; import static org.neo4j.helpers.Exceptions.launderedException; import static org.neo4j.helpers.Format.bytes; @@ -276,76 +276,19 @@ private Throwable error( long length, int itemSize, Throwable error ) * ({@link #newLongArray(long, long)} and {@link #newIntArray(long, int)} into smaller chunks where * some can live on heap and some off heap. */ - NumberArrayFactory CHUNKED_FIXED_SIZE = new Adapter() - { - private final NumberArrayFactory delegate = new Auto( OFF_HEAP, HEAP ); - - @Override - public LongArray newLongArray( long length, long defaultValue, long base ) - { - // Here we want to have the property of a dynamic array which makes some parts of the array - // live on heap, some off. At the same time we want a fixed size array. Therefore first create - // the array as a dynamic array and make it grow to the requested length. - LongArray array = newDynamicLongArray( fractionOf( length ), defaultValue ); - array.at( length - 1 ); - return array; - } - - @Override - public IntArray newIntArray( long length, int defaultValue, long base ) - { - // Here we want to have the property of a dynamic array which makes some parts of the array - // live on heap, some off. At the same time we want a fixed size array. Therefore first create - // the array as a dynamic array and make it grow to the requested length. - IntArray array = newDynamicIntArray( fractionOf( length ), defaultValue ); - array.at( length - 1 ); - return array; - } - - @Override - public ByteArray newByteArray( long length, byte[] defaultValue, long base ) - { - // Here we want to have the property of a dynamic array which makes some parts of the array - // live on heap, some off. At the same time we want a fixed size array. Therefore first create - // the array as a dynamic array and make it grow to the requested length. - ByteArray array = newDynamicByteArray( fractionOf( length ), defaultValue ); - array.at( length - 1 ); - return array; - } - - private long fractionOf( long length ) - { - int maxArraySize = Integer.MAX_VALUE - Short.MAX_VALUE; - return min( length / 10, maxArraySize ); - } - - @Override - public IntArray newDynamicIntArray( long chunkSize, int defaultValue ) - { - return new DynamicIntArray( delegate, chunkSize, defaultValue ); - } - - @Override - public LongArray newDynamicLongArray( long chunkSize, long defaultValue ) - { - return new DynamicLongArray( delegate, chunkSize, defaultValue ); - } - - @Override - public ByteArray newDynamicByteArray( long chunkSize, byte[] defaultValue ) - { - return new DynamicByteArray( delegate, chunkSize, defaultValue ); - } - - @Override - public String toString() - { - return "CHUNKED_FIXED_SIZE"; - } - }; + NumberArrayFactory CHUNKED_FIXED_SIZE = new ChunkedNumberArrayFactory(); /** * {@link Auto} factory which uses JVM stats for gathering information about available memory. */ NumberArrayFactory AUTO = new Auto( OFF_HEAP, HEAP, CHUNKED_FIXED_SIZE ); + + static NumberArrayFactory autoWithPageCacheFallback( PageCache pageCache, File dir ) + { + PageCachedNumberArrayFactory pagedArrayFactory = + new PageCachedNumberArrayFactory( pageCache, dir ); + ChunkedNumberArrayFactory chunkedArrayFactory = + new ChunkedNumberArrayFactory( OFF_HEAP, HEAP, pagedArrayFactory ); + return new Auto( OFF_HEAP, HEAP, chunkedArrayFactory ); + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapByteArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapByteArray.java index b493d89af11c0..acf12b4ade97c 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapByteArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapByteArray.java @@ -33,13 +33,12 @@ protected OffHeapByteArray( long length, byte[] defaultValue, long base ) } @Override - public void swap( long fromIndex, long toIndex, int numberOfEntries ) + public void swap( long fromIndex, long toIndex ) { - int size = numberOfEntries * itemSize; - long intermediary = UnsafeUtil.allocateMemory( size ); - UnsafeUtil.copyMemory( address( fromIndex, 0 ), intermediary, size ); - UnsafeUtil.copyMemory( address( toIndex, 0 ), address( fromIndex, 0 ), size ); - UnsafeUtil.copyMemory( intermediary, address( toIndex, 0 ), size ); + long intermediary = UnsafeUtil.allocateMemory( itemSize ); + UnsafeUtil.copyMemory( address( fromIndex, 0 ), intermediary, itemSize ); + UnsafeUtil.copyMemory( address( toIndex, 0 ), address( fromIndex, 0 ), itemSize ); + UnsafeUtil.copyMemory( intermediary, address( toIndex, 0 ), itemSize ); UnsafeUtil.free( intermediary ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapIntArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapIntArray.java index 2457912f31213..7a183e67fc26c 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapIntArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapIntArray.java @@ -63,18 +63,4 @@ public void clear() } } } - - @Override - public void swap( long fromIndex, long toIndex, int numberOfEntries ) - { - long fromAddress = addressOf( fromIndex ); - long toAddress = addressOf( toIndex ); - - for ( int i = 0; i < numberOfEntries; i++, fromAddress += itemSize, toAddress += itemSize ) - { - int fromValue = UnsafeUtil.getInt( fromAddress ); - UnsafeUtil.putInt( fromAddress, UnsafeUtil.getInt( toAddress ) ); - UnsafeUtil.putInt( toAddress, fromValue ); - } - } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapLongArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapLongArray.java index dc28b6ccbad74..be367e107a4f7 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapLongArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/OffHeapLongArray.java @@ -63,18 +63,4 @@ public void clear() } } } - - @Override - public void swap( long fromIndex, long toIndex, int numberOfEntries ) - { - long fromAddress = addressOf( fromIndex ); - long toAddress = addressOf( toIndex ); - - for ( int i = 0; i < numberOfEntries; i++, fromAddress += itemSize, toAddress += itemSize ) - { - long fromValue = UnsafeUtil.getLong( fromAddress ); - UnsafeUtil.putLong( fromAddress, UnsafeUtil.getLong( toAddress ) ); - UnsafeUtil.putLong( toAddress, fromValue ); - } - } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheByteArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheByteArray.java new file mode 100644 index 0000000000000..36a761b8d4e9f --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheByteArray.java @@ -0,0 +1,351 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.cache; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import org.neo4j.io.pagecache.PageCursor; +import org.neo4j.io.pagecache.PagedFile; + +public class PageCacheByteArray extends PageCacheNumberArray implements ByteArray +{ + private final byte[] defaultValue; + + public PageCacheByteArray( PagedFile pagedFile, long length, byte[] defaultValue, long base ) throws IOException + { + // '0' default value means we skip filling it out in super + super( pagedFile, defaultValue.length, length, 0, base ); + this.defaultValue = defaultValue; + setDefaultValue( writeCursor( 0 ), -1 ); + } + + @Override + protected void fillPageWithDefaultValue( PageCursor writeCursor, long ignoredDefaultValue, int pageSize ) + { + for ( int i = 0; i < entriesPerPage; i++ ) + { + writeCursor.putBytes( this.defaultValue ); + } + } + + @Override + public void swap( long fromIndex, long toIndex ) + { + byte[] a = defaultValue.clone(); + byte[] b = defaultValue.clone(); + get( fromIndex, a ); + get( toIndex, b ); + set( fromIndex, b ); + set( toIndex, a ); + } + + @Override + public void get( long index, byte[] into ) + { + long pageId = pageId( index ); + int offset = offset( index ); + try + { + PageCursor cursor = readCursor( pageId ); + do + { + for ( int i = 0; i < into.length; i++ ) + { + into[i] = cursor.getByte( offset + i ); + } + } + while ( cursor.shouldRetry() ); + checkBounds( cursor ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public byte getByte( long index, int offset ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = readCursor( pageId ); + byte result; + do + { + result = cursor.getByte( offset ); + } + while ( cursor.shouldRetry() ); + checkBounds( cursor ); + return result; + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public short getShort( long index, int offset ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = readCursor( pageId ); + short result; + do + { + result = cursor.getShort( offset ); + } + while ( cursor.shouldRetry() ); + checkBounds( cursor ); + return result; + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public int getInt( long index, int offset ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = readCursor( pageId ); + int result; + do + { + result = cursor.getInt( offset ); + } + while ( cursor.shouldRetry() ); + checkBounds( cursor ); + return result; + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public long get6ByteLong( long index, int offset ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = readCursor( pageId ); + long result; + do + { + long low4b = cursor.getInt( offset ) & 0xFFFFFFFFL; + long high2b = cursor.getShort( offset + Integer.BYTES ); + result = low4b | (high2b << 32); + } + while ( cursor.shouldRetry() ); + checkBounds( cursor ); + return result; + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public long getLong( long index, int offset ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = readCursor( pageId ); + long result; + do + { + result = cursor.getLong( offset ); + } + while ( cursor.shouldRetry() ); + checkBounds( cursor ); + return result; + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public void set( long index, byte[] value ) + { + assert value.length == entrySize; + long pageId = pageId( index ); + int offset = offset( index ); + try + { + PageCursor cursor = writeCursor( pageId ); + for ( int i = 0; i < value.length; i++ ) + { + cursor.putByte( offset + i, value[i] ); + } + checkBounds( cursor ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public void setByte( long index, int offset, byte value ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = writeCursor( pageId ); + cursor.putByte( offset, value ); + checkBounds( cursor ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public void setShort( long index, int offset, short value ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = writeCursor( pageId ); + cursor.putShort( offset, value ); + checkBounds( cursor ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public void setInt( long index, int offset, int value ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = writeCursor( pageId ); + cursor.putInt( offset, value ); + checkBounds( cursor ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public void set6ByteLong( long index, int offset, long value ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = writeCursor( pageId ); + cursor.putInt( offset, (int) value ); + cursor.putShort( offset + Integer.BYTES, (short) (value >>> 32) ); + checkBounds( cursor ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public void setLong( long index, int offset, long value ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = writeCursor( pageId ); + cursor.putLong( offset, value ); + checkBounds( cursor ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public int get3ByteInt( long index, int offset ) + { + + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = readCursor( pageId ); + int result; + do + { + int lowWord = cursor.getShort( offset ) & 0xFFFF; + byte highByte = cursor.getByte( offset + Short.BYTES ); + result = lowWord | (highByte << Short.SIZE); + } + while ( cursor.shouldRetry() ); + checkBounds( cursor ); + return result; + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public void set3ByteInt( long index, int offset, int value ) + { + long pageId = pageId( index ); + offset += offset( index ); + try + { + PageCursor cursor = writeCursor( pageId ); + cursor.putShort( offset, (short) value ); + cursor.putByte( offset + Short.BYTES, (byte) (value >>> Short.SIZE) ); + checkBounds( cursor ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheIntArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheIntArray.java index 468cba33927de..dd66967eadc01 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheIntArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheIntArray.java @@ -20,65 +20,55 @@ package org.neo4j.unsafe.impl.batchimport.cache; import java.io.IOException; +import java.io.UncheckedIOException; +import org.neo4j.io.pagecache.PageCursor; import org.neo4j.io.pagecache.PagedFile; -import static java.lang.Math.toIntExact; - public class PageCacheIntArray extends PageCacheNumberArray implements IntArray { - private static final int ENTRY_SIZE = Integer.BYTES; - private static final int ENTRIES_PER_PAGE = PAGE_SIZE / ENTRY_SIZE; - - public PageCacheIntArray( PagedFile pagedFile ) throws IOException + public PageCacheIntArray( PagedFile pagedFile, long length, long defaultValue, long base ) throws IOException { - super( pagedFile, ENTRIES_PER_PAGE, ENTRY_SIZE ); + super( pagedFile, Integer.BYTES, length, defaultValue, base ); } @Override public int get( long index ) { - long pageId = index / ENTRIES_PER_PAGE; - int offset = toIntExact( index % ENTRIES_PER_PAGE ) * ENTRY_SIZE; - if ( writeCursor.getCurrentPageId() == pageId ) - { - // We have to read from the write cursor, since the write cursor is on it - return writeCursor.getInt( offset ); - } - - // Go ahead and read from the read cursor + long pageId = pageId( index ); + int offset = offset( index ); try { - goTo( readCursor, pageId ); + PageCursor cursor = readCursor( pageId ); int result; do { - result = readCursor.getInt( offset ); + result = cursor.getInt( offset ); } - while ( readCursor.shouldRetry() ); - checkBounds( readCursor ); + while ( cursor.shouldRetry() ); + checkBounds( cursor ); return result; } catch ( IOException e ) { - throw new RuntimeException( e ); + throw new UncheckedIOException( e ); } } @Override public void set( long index, int value ) { - long pageId = index / ENTRIES_PER_PAGE; - int offset = toIntExact( index % ENTRIES_PER_PAGE ) * ENTRY_SIZE; + long pageId = pageId( index ); + int offset = offset( index ); try { - goTo( writeCursor, pageId ); - writeCursor.putInt( offset, value ); - checkBounds( writeCursor ); + PageCursor cursor = writeCursor( pageId ); + cursor.putInt( offset, value ); + checkBounds( cursor ); } catch ( IOException e ) { - throw new RuntimeException( e ); + throw new UncheckedIOException( e ); } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheLongArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheLongArray.java index babc1d4a28a03..e4e0f53385023 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheLongArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheLongArray.java @@ -20,65 +20,55 @@ package org.neo4j.unsafe.impl.batchimport.cache; import java.io.IOException; +import java.io.UncheckedIOException; +import org.neo4j.io.pagecache.PageCursor; import org.neo4j.io.pagecache.PagedFile; -import static java.lang.Math.toIntExact; - public class PageCacheLongArray extends PageCacheNumberArray implements LongArray { - private static final int ENTRY_SIZE = Long.BYTES; - private static final int ENTRIES_PER_PAGE = PAGE_SIZE / ENTRY_SIZE; - - public PageCacheLongArray( PagedFile pagedFile ) throws IOException + public PageCacheLongArray( PagedFile pagedFile, long length, long defaultValue, long base ) throws IOException { - super( pagedFile, ENTRIES_PER_PAGE, ENTRY_SIZE ); + super( pagedFile, Long.BYTES, length, defaultValue, base ); } @Override public long get( long index ) { - long pageId = index / ENTRIES_PER_PAGE; - int offset = toIntExact( index % ENTRIES_PER_PAGE ) * ENTRY_SIZE; - if ( writeCursor.getCurrentPageId() == pageId ) - { - // We have to read from the write cursor, since the write cursor is on it - return writeCursor.getLong( offset ); - } - - // Go ahead and read from the read cursor + long pageId = pageId( index ); + int offset = offset( index ); try { - goTo( readCursor, pageId ); + PageCursor cursor = readCursor( pageId ); long result; do { - result = readCursor.getLong( offset ); + result = cursor.getLong( offset ); } - while ( readCursor.shouldRetry() ); - checkBounds( readCursor ); + while ( cursor.shouldRetry() ); + checkBounds( cursor ); return result; } catch ( IOException e ) { - throw new RuntimeException( e ); + throw new UncheckedIOException( e ); } } @Override public void set( long index, long value ) { - long pageId = index / ENTRIES_PER_PAGE; - int offset = toIntExact( index % ENTRIES_PER_PAGE ) * ENTRY_SIZE; + long pageId = pageId( index ); + int offset = offset( index ); try { - goTo( writeCursor, pageId ); - writeCursor.putLong( offset, value ); - checkBounds( writeCursor ); + PageCursor cursor = writeCursor( pageId ); + cursor.putLong( offset, value ); + checkBounds( cursor ); } catch ( IOException e ) { - throw new RuntimeException( e ); + throw new UncheckedIOException( e ); } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheNumberArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheNumberArray.java index 778999e39cb92..106f8eb7b9d4a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheNumberArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheNumberArray.java @@ -20,62 +20,141 @@ package org.neo4j.unsafe.impl.batchimport.cache; import java.io.IOException; +import java.io.UncheckedIOException; import org.neo4j.io.pagecache.PageCursor; import org.neo4j.io.pagecache.PagedFile; -import static org.neo4j.io.ByteUnit.kibiBytes; +import static java.lang.Math.toIntExact; +import static org.neo4j.io.pagecache.PagedFile.PF_NO_GROW; +import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK; +import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_WRITE_LOCK; +// todo make safe for concurrent access public abstract class PageCacheNumberArray> implements NumberArray { - static final int PAGE_SIZE = (int) kibiBytes( 8 ); - protected final PagedFile pagedFile; - protected final PageCursor readCursor; - protected final PageCursor writeCursor; - private final int entriesPerPage; - private final int entrySize; + protected final int entriesPerPage; + protected final int entrySize; + private final PageCursor readCursor; + private final PageCursor writeCursor; + private final long length; + private final long defaultValue; + private final long base; + private boolean closed; - public PageCacheNumberArray( PagedFile pagedFile, int entriesPerPage, int entrySize ) throws IOException + public PageCacheNumberArray( PagedFile pagedFile, int entrySize, long length, + long defaultValue, long base ) throws IOException { this.pagedFile = pagedFile; - this.entriesPerPage = entriesPerPage; this.entrySize = entrySize; - this.readCursor = pagedFile.io( 0, PagedFile.PF_SHARED_READ_LOCK ); - this.writeCursor = pagedFile.io( 0, PagedFile.PF_SHARED_WRITE_LOCK ); - goTo( writeCursor, 0 ); + this.entriesPerPage = pagedFile.pageSize() / entrySize; + this.readCursor = pagedFile.io( 0, PF_SHARED_READ_LOCK ); + this.writeCursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK | PF_NO_GROW ); + this.length = length; + this.defaultValue = defaultValue; + this.base = base; + + try ( PageCursor cursorToSetLength = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) ) + { + setLength( cursorToSetLength, length ); + } + + if ( defaultValue != 0 ) + { + setDefaultValue( writeCursor, defaultValue ); + } } - @Override - public long length() + private void setLength( PageCursor cursor, long length ) throws IOException { - try + goTo( cursor, ( length - 1 ) / entriesPerPage ); + } + + protected long pageId( long index ) + { + return rebase( index ) / entriesPerPage; + } + + protected int offset( long index ) + { + return toIntExact( rebase( index ) % entriesPerPage ) * entrySize; + } + + private long rebase( long index ) + { + return index - base; + } + + protected void setDefaultValue( PageCursor writeCursor, long defaultValue ) throws IOException + { + if ( entrySize == Integer.BYTES ) { - return pagedFile.getLastPageId() * entriesPerPage; + defaultValue |= defaultValue << 32; } - catch ( IOException e ) + goTo( writeCursor, 0 ); + int pageSize = pagedFile.pageSize(); + fillPageWithDefaultValue( writeCursor, defaultValue, pageSize ); + if ( pageId( length - 1 ) > 0 ) { - throw new RuntimeException( e ); + try ( PageCursor cursor = pagedFile.io( 1, PF_NO_GROW | PF_SHARED_WRITE_LOCK ) ) + { + while ( cursor.next() ) + { + writeCursor.copyTo( 0, cursor, 0, pageSize ); + } + } + } + } + + protected void fillPageWithDefaultValue( PageCursor writeCursor, long defaultValue, int pageSize ) + { + int longsInPage = pageSize / Long.BYTES; + for ( int i = 0; i < longsInPage; i++ ) + { + writeCursor.putLong( defaultValue ); } } + @Override + public long length() + { + return length; + } + @Override public void clear() { + try + { + setDefaultValue( writeCursor, defaultValue ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } } @Override public void close() { - readCursor.close(); - writeCursor.close(); + if ( closed ) + { + return; + } try { + readCursor.close(); + writeCursor.close(); pagedFile.close(); } catch ( IOException e ) { - throw new RuntimeException( e ); + throw new UncheckedIOException( e ); + } + finally + { + closed = true; } } @@ -99,11 +178,26 @@ protected void checkBounds( PageCursor cursor ) } } - protected void goTo( PageCursor cursor, long pageId ) throws IOException + protected PageCursor goTo( PageCursor cursor, long pageId ) throws IOException { if ( !cursor.next( pageId ) ) { throw new IllegalStateException(); } + return cursor; + } + + protected PageCursor writeCursor( long pageId ) throws IOException + { + return goTo( writeCursor, pageId ); + } + + protected PageCursor readCursor( long pageId ) throws IOException + { + if ( writeCursor.getCurrentPageId() == pageId ) + { + return writeCursor; + } + return goTo( readCursor, pageId ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCachedNumberArrayFactory.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCachedNumberArrayFactory.java new file mode 100644 index 0000000000000..60724a77cd564 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/PageCachedNumberArrayFactory.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.cache; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; + +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.io.pagecache.PagedFile; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.DELETE_ON_CLOSE; + +public class PageCachedNumberArrayFactory extends NumberArrayFactory.Adapter +{ + private final PageCache pageCache; + private final File storeDir; + + public PageCachedNumberArrayFactory( PageCache pageCache, File storeDir ) + { + this.pageCache = pageCache; + this.storeDir = storeDir; + } + + @Override + public IntArray newIntArray( long length, int defaultValue, long base ) + { + try + { + File tempFile = File.createTempFile( "intArray", ".tmp", storeDir ); + PagedFile pagedFile = pageCache.map( tempFile, pageCache.pageSize(), DELETE_ON_CLOSE, CREATE ); + return new PageCacheIntArray( pagedFile, length, defaultValue, base ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public LongArray newLongArray( long length, long defaultValue, long base ) + { + try + { + File tempFile = File.createTempFile( "longArray", ".tmp", storeDir ); + PagedFile pagedFile = pageCache.map( tempFile, pageCache.pageSize(), DELETE_ON_CLOSE, CREATE ); + return new PageCacheLongArray( pagedFile, length, defaultValue, base ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + @Override + public ByteArray newByteArray( long length, byte[] defaultValue, long base ) + { + try + { + File tempFile = File.createTempFile( "byteArray", ".tmp", storeDir ); + PagedFile pagedFile = pageCache.map( tempFile, pageCache.pageSize(), DELETE_ON_CLOSE, CREATE ); + return new PageCacheByteArray( pagedFile, length, defaultValue, base ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/AbstractTracker.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/AbstractTracker.java index cc69c46207c59..e8f6c99d686c6 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/AbstractTracker.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/AbstractTracker.java @@ -43,9 +43,9 @@ public void acceptMemoryStatsVisitor( MemoryStatsVisitor visitor ) } @Override - public void swap( long fromIndex, long toIndex, int count ) + public void swap( long fromIndex, long toIndex ) { - array.swap( fromIndex, toIndex, count ); + array.swap( fromIndex, toIndex ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/EncodingIdMapper.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/EncodingIdMapper.java index a02bc1d4aec98..f7867ef337110 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/EncodingIdMapper.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/EncodingIdMapper.java @@ -397,7 +397,7 @@ dataIndexA, groupOf( dataIndexA ).id(), if ( dataIndexA > dataIndexB ) { // Swap so that lower tracker index means lower data index. TODO Why do we do this? - trackerCache.swap( i, i + 1, 1 ); + trackerCache.swap( i, i + 1 ); } if ( collision != ID_NOT_FOUND ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/ParallelSort.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/ParallelSort.java index fdb79f41ea2a6..239dbef124bf6 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/ParallelSort.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/ParallelSort.java @@ -273,7 +273,7 @@ private long partition( long leftIndex, long rightIndex, long pivotIndex ) long li = leftIndex, ri = rightIndex - 2, pi = pivotIndex; long pivot = clearCollision( dataCache.get( tracker.get( pi ) ) ); // save pivot in last index - tracker.swap( pi, rightIndex - 1, 1 ); + tracker.swap( pi, rightIndex - 1 ); long left = clearCollision( dataCache.get( tracker.get( li ) ) ); long right = clearCollision( dataCache.get( tracker.get( ri ) ) ); while ( li < ri ) @@ -288,7 +288,7 @@ else if ( comparator.ge( right, pivot ) ) } else { // this value is on the wrong side of the pivot, swapping - tracker.swap( li, ri, 1 ); + tracker.swap( li, ri ); long temp = left; left = right; right = temp; @@ -300,7 +300,7 @@ else if ( comparator.ge( right, pivot ) ) partingIndex++; } // restore pivot - tracker.swap( rightIndex - 1, partingIndex, 1 ); + tracker.swap( rightIndex - 1, partingIndex ); return partingIndex; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/Tracker.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/Tracker.java index e2730eccf6d14..8efdc83f8c7a8 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/Tracker.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/idmapping/string/Tracker.java @@ -52,9 +52,8 @@ public interface Tracker extends MemoryStatsVisitor.Visitable, AutoCloseable * * @param fromIndex index to swap from. * @param toIndex index to swap to. - * @param count number of items to swap. */ - void swap( long fromIndex, long toIndex, int count ); + void swap( long fromIndex, long toIndex ); /** * Sets {@code value} at the specified {@code index}. diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Input.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Input.java index 688a0b69f6e15..b3bb594801fca 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Input.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Input.java @@ -21,6 +21,7 @@ import org.neo4j.unsafe.impl.batchimport.BatchImporter; import org.neo4j.unsafe.impl.batchimport.InputIterable; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; @@ -50,8 +51,9 @@ public interface Input * @return {@link IdMapper} which will get populated by {@link InputNode#id() input node ids} * and later queried by {@link InputRelationship#startNode()} and {@link InputRelationship#endNode()} ids * to resolve potentially temporary input node ids to actual node ids in the database. + * @param numberArrayFactory The factory for creating data-structures to use for caching internally in the IdMapper. */ - IdMapper idMapper(); + IdMapper idMapper( NumberArrayFactory numberArrayFactory ); /** * @return {@link IdGenerator} which is responsible for generating actual node ids from input node ids. diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Inputs.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Inputs.java index 779b13fb3b9f6..db2634b6567fa 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Inputs.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Inputs.java @@ -22,6 +22,7 @@ import java.io.File; import org.neo4j.unsafe.impl.batchimport.InputIterable; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.csv.Configuration; @@ -58,7 +59,7 @@ public InputIterable nodes() } @Override - public IdMapper idMapper() + public IdMapper idMapper( NumberArrayFactory numberArrayFactory ) { return idMapper; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java index 4221f68fcdfc1..aae19b16b7326 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java @@ -25,6 +25,7 @@ import org.neo4j.kernel.impl.util.Validators; import org.neo4j.unsafe.impl.batchimport.InputIterable; import org.neo4j.unsafe.impl.batchimport.InputIterator; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.Collector; @@ -148,9 +149,9 @@ public boolean supportsMultiplePasses() } @Override - public IdMapper idMapper() + public IdMapper idMapper( NumberArrayFactory numberArrayFactory ) { - return idType.idMapper(); + return idType.idMapper( numberArrayFactory ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/IdType.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/IdType.java index 34c106bbd276d..5433843741c58 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/IdType.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/IdType.java @@ -21,14 +21,13 @@ import org.neo4j.csv.reader.Extractor; import org.neo4j.csv.reader.Extractors; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerators; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMappers; import org.neo4j.unsafe.impl.batchimport.input.InputNode; -import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; - /** * Defines different types that input ids can come in. Enum names in here are user facing. * @@ -49,9 +48,9 @@ public Extractor extractor( Extractors extractors ) } @Override - public IdMapper idMapper() + public IdMapper idMapper( NumberArrayFactory numberArrayFactory ) { - return IdMappers.strings( AUTO ); + return IdMappers.strings( numberArrayFactory ); } @Override @@ -74,9 +73,9 @@ public Extractor extractor( Extractors extractors ) } @Override - public IdMapper idMapper() + public IdMapper idMapper( NumberArrayFactory numberArrayFactory ) { - return IdMappers.longs( AUTO ); + return IdMappers.longs( numberArrayFactory ); } @Override @@ -99,7 +98,7 @@ public Extractor extractor( Extractors extractors ) } @Override - public IdMapper idMapper() + public IdMapper idMapper( NumberArrayFactory numberArrayFactory ) { return IdMappers.actual(); } @@ -118,7 +117,7 @@ public IdGenerator idGenerator() this.idsAreExternal = idsAreExternal; } - public abstract IdMapper idMapper(); + public abstract IdMapper idMapper( NumberArrayFactory numberArrayFactory ); public abstract IdGenerator idGenerator(); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java index bc87890b5d602..4255469d327e2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java @@ -283,6 +283,11 @@ public CountsTracker getCountsStore() return neoStores.getCounts(); } + public PageCache getPageCache() + { + return pageCache; + } + @Override public void close() throws IOException { diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/counts/CountsComputerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/counts/CountsComputerTest.java index ed33e74136db2..3bc3dbaf47e36 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/counts/CountsComputerTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/counts/CountsComputerTest.java @@ -55,6 +55,7 @@ import org.neo4j.test.rule.PageCacheRule; import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.fs.EphemeralFileSystemRule; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import static org.junit.Assert.assertEquals; import static org.neo4j.kernel.impl.store.counts.keys.CountsKeyFactory.nodeKey; @@ -327,7 +328,8 @@ private void rebuildCounts( long lastCommittedTransactionId ) throws IOException int highLabelId = (int) neoStores.getLabelTokenStore().getHighId(); int highRelationshipTypeId = (int) neoStores.getRelationshipTypeTokenStore().getHighId(); CountsComputer countsComputer = new CountsComputer( - lastCommittedTransactionId, nodeStore, relationshipStore, highLabelId, highRelationshipTypeId ); + lastCommittedTransactionId, nodeStore, relationshipStore, highLabelId, highRelationshipTypeId, + NumberArrayFactory.AUTO ); CountsTracker countsTracker = createCountsTracker(); life.add( countsTracker.setInitializer( countsComputer ) ); } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenterTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenterTest.java index 407e04d7c935b..90c5f58512fe9 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenterTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenterTest.java @@ -59,6 +59,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; @RunWith( Parameterized.class ) public class RelationshipGroupDefragmenterTest @@ -192,7 +193,7 @@ private void defrag( int nodeCount, RecordStore groupSt { Monitor monitor = mock( Monitor.class ); RelationshipGroupDefragmenter defragmenter = new RelationshipGroupDefragmenter( CONFIG, - ExecutionMonitors.invisible(), monitor ); + ExecutionMonitors.invisible(), monitor, AUTO ); // Calculation below correlates somewhat to calculation in RelationshipGroupDefragmenter. // Anyway we verify below that we exercise the multi-pass bit, which is what we want diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/ByteArrayTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/ByteArrayTest.java index d8a445e1fe519..59583a161d7f8 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/ByteArrayTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/ByteArrayTest.java @@ -20,6 +20,7 @@ package org.neo4j.unsafe.impl.batchimport.cache; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -27,27 +28,56 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import java.io.File; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; +import org.neo4j.io.pagecache.PageCache; + +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.HEAP; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.OFF_HEAP; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.autoWithPageCacheFallback; @RunWith( Parameterized.class ) -public class ByteArrayTest +public class ByteArrayTest extends NumberArrayPageCacheTestSupport { private static final byte[] DEFAULT = new byte[25]; + private static final int LENGTH = 1_000; + private static Fixture fixture; @Parameters - public static Collection> data() + public static Collection> data() throws IOException { + fixture = prepareDirectoryAndPageCache( ByteArrayTest.class ); + PageCache pageCache = fixture.pageCache; + File dir = fixture.directory; + NumberArrayFactory autoWithPageCacheFallback = autoWithPageCacheFallback( pageCache, dir ); + NumberArrayFactory pageCacheArrayFactory = new PageCachedNumberArrayFactory( pageCache, dir ); + int chunkSize = LENGTH / 10; return Arrays.asList( - () -> NumberArrayFactory.HEAP.newByteArray( 1_000, DEFAULT ), - () -> NumberArrayFactory.HEAP.newDynamicByteArray( 100, DEFAULT ), - () -> NumberArrayFactory.OFF_HEAP.newByteArray( 1_000, DEFAULT ), - () -> NumberArrayFactory.OFF_HEAP.newDynamicByteArray( 100, DEFAULT ), - () -> NumberArrayFactory.AUTO.newByteArray( 1_000, DEFAULT ), - () -> NumberArrayFactory.AUTO.newDynamicByteArray( 100, DEFAULT ) ); + () -> HEAP.newByteArray( LENGTH, DEFAULT ), + () -> HEAP.newDynamicByteArray( chunkSize, DEFAULT ), + () -> OFF_HEAP.newByteArray( LENGTH, DEFAULT ), + () -> OFF_HEAP.newDynamicByteArray( chunkSize, DEFAULT ), + () -> AUTO.newByteArray( LENGTH, DEFAULT ), + () -> AUTO.newDynamicByteArray( chunkSize, DEFAULT ), + () -> autoWithPageCacheFallback.newByteArray( LENGTH, DEFAULT ), + () -> autoWithPageCacheFallback.newDynamicByteArray( chunkSize, DEFAULT ), + () -> pageCacheArrayFactory.newByteArray( LENGTH, DEFAULT ), + () -> pageCacheArrayFactory.newDynamicByteArray( chunkSize, DEFAULT ) + ); + } + + @AfterClass + public static void closeFixture() throws Exception + { + fixture.close(); } @Parameter @@ -69,19 +99,58 @@ public void after() @Test public void shouldSetAndGetBasicTypes() throws Exception { - // WHEN - array.setByte( 0, 0, (byte) 123 ); - array.setShort( 0, 1, (short) 1234 ); - array.setInt( 0, 5, 12345 ); - array.setLong( 0, 9, Long.MAX_VALUE - 100 ); - array.set3ByteInt( 0, 17, 76767 ); + int index = 0; + byte[] actualBytes = new byte[DEFAULT.length]; + byte[] expectedBytes = new byte[actualBytes.length]; + ThreadLocalRandom.current().nextBytes( actualBytes ); - // THEN - assertEquals( (byte) 123, array.getByte( 0, 0 ) ); - assertEquals( (short) 1234, array.getShort( 0, 1 ) ); - assertEquals( 12345, array.getInt( 0, 5 ) ); - assertEquals( Long.MAX_VALUE - 100, array.getLong( 0, 9 ) ); - assertEquals( 76767, array.get3ByteInt( 0, 17 ) ); + int len = LENGTH - 1; // subtract one because we access TWO elements. + for ( int i = 0; i < len; i++ ) + { + try + { + // WHEN + setSimpleValues( index ); + setArray( index + 1, actualBytes ); + + // THEN + verifySimpleValues( index ); + verifyArray( index + 1, actualBytes, expectedBytes ); + } + catch ( Throwable throwable ) + { + throw new AssertionError( "Failure at index " + i, throwable ); + } + } + } + + private void setSimpleValues( int index ) + { + array.setByte( index, 0, (byte) 123 ); + array.setShort( index, 1, (short) 1234 ); + array.setInt( index, 5, 12345 ); + array.setLong( index, 9, Long.MAX_VALUE - 100 ); + array.set3ByteInt( index, 17, 76767 ); + } + + private void setArray( int index, byte[] bytes ) + { + array.set( index, bytes ); + } + + private void verifySimpleValues( int index ) + { + assertEquals( (byte) 123, array.getByte( index, 0 ) ); + assertEquals( (short) 1234, array.getShort( index, 1 ) ); + assertEquals( 12345, array.getInt( index, 5 ) ); + assertEquals( Long.MAX_VALUE - 100, array.getLong( index, 9 ) ); + assertEquals( 76767, array.get3ByteInt( index, 17 ) ); + } + + private void verifyArray( int index, byte[] actualBytes, byte[] scratchBuffer ) + { + array.get( index, scratchBuffer ); + assertArrayEquals( actualBytes, scratchBuffer ); } @Test @@ -95,4 +164,14 @@ public void shouldDetectMinusOne() throws Exception assertEquals( -1L, array.get6ByteLong( 10, 2 ) ); assertEquals( -1L, array.get6ByteLong( 10, 8 ) ); } + + @Test + public void shouldHandleMultipleCallsToClose() throws Exception + { + // WHEN + array.close(); + + // THEN should also work + array.close(); + } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/IntArrayTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/IntArrayTest.java index e4c0b3c677f14..670edcf232d36 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/IntArrayTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/IntArrayTest.java @@ -20,21 +20,31 @@ package org.neo4j.unsafe.impl.batchimport.cache; import org.junit.After; +import org.junit.AfterClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import java.io.File; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Random; +import org.neo4j.io.pagecache.PageCache; + import static java.lang.System.currentTimeMillis; import static org.junit.Assert.assertEquals; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.HEAP; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.OFF_HEAP; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.autoWithPageCacheFallback; @RunWith( Parameterized.class ) -public class IntArrayTest +public class IntArrayTest extends NumberArrayPageCacheTestSupport { + private static Fixture fixture; + @Test public void shouldHandleSomeRandomSetAndGet() throws Exception { @@ -62,29 +72,49 @@ public void shouldHandleSomeRandomSetAndGet() throws Exception assertEquals( "Seed:" + seed, expected[index], array.get( index ) ); break; default: // swap - int items = Math.min( random.nextInt( 10 ) + 1, length - index ); - int toIndex = (index + length / 2) % (length - items); - array.swap( index, toIndex, items ); - swap( expected, index, toIndex, items ); + int toIndex = random.nextInt( length ); + array.swap( index, toIndex ); + swap( expected, index, toIndex ); break; } } } - private void swap( long[] expected, int fromIndex, int toIndex, int items ) + @Test + public void shouldHandleMultipleCallsToClose() throws Exception { - for ( int i = 0; i < items; i++ ) - { - long fromValue = expected[fromIndex + i]; - expected[fromIndex + i] = expected[toIndex + i]; - expected[toIndex + i] = fromValue; - } + // GIVEN + NumberArray array = newArray( 10, -1 ); + + // WHEN + array.close(); + + // THEN should also work + array.close(); + } + + private void swap( long[] expected, int fromIndex, int toIndex ) + { + long fromValue = expected[fromIndex]; + expected[fromIndex] = expected[toIndex]; + expected[toIndex] = fromValue; } @Parameters - public static Collection data() + public static Collection data() throws IOException + { + fixture = prepareDirectoryAndPageCache( IntArrayTest.class ); + PageCache pageCache = fixture.pageCache; + File dir = fixture.directory; + NumberArrayFactory autoWithPageCacheFallback = autoWithPageCacheFallback( pageCache, dir ); + NumberArrayFactory pageCacheArrayFactory = new PageCachedNumberArrayFactory( pageCache, dir ); + return Arrays.asList( HEAP, OFF_HEAP, autoWithPageCacheFallback, pageCacheArrayFactory ); + } + + @AfterClass + public static void closeFixture() throws Exception { - return Arrays.asList( NumberArrayFactory.HEAP, NumberArrayFactory.OFF_HEAP ); + fixture.close(); } public IntArrayTest( NumberArrayFactory factory ) diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/LongArrayTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/LongArrayTest.java index aa8aaa82070f3..beab2f1929d90 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/LongArrayTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/LongArrayTest.java @@ -20,21 +20,31 @@ package org.neo4j.unsafe.impl.batchimport.cache; import org.junit.After; +import org.junit.AfterClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import java.io.File; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Random; +import org.neo4j.io.pagecache.PageCache; + import static java.lang.System.currentTimeMillis; import static org.junit.Assert.assertEquals; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.HEAP; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.OFF_HEAP; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.autoWithPageCacheFallback; @RunWith( Parameterized.class ) -public class LongArrayTest +public class LongArrayTest extends NumberArrayPageCacheTestSupport { + private static Fixture fixture; + @Test public void shouldHandleSomeRandomSetAndGet() throws Exception { @@ -62,10 +72,9 @@ public void shouldHandleSomeRandomSetAndGet() throws Exception assertEquals( "Seed:" + seed, expected[index], array.get( index ) ); break; default: // swap - int items = Math.min( random.nextInt( 10 ) + 1, length - index ); - int toIndex = (index + length / 2) % (length - items); - array.swap( index, toIndex, items ); - swap( expected, index, toIndex, items ); + int toIndex = random.nextInt( length ); + array.swap( index, toIndex ); + swap( expected, index, toIndex ); break; } } @@ -75,7 +84,7 @@ public void shouldHandleSomeRandomSetAndGet() throws Exception public void shouldHandleMultipleCallsToClose() throws Exception { // GIVEN - LongArray array = newArray( 10, -1 ); + NumberArray array = newArray( 10, -1 ); // WHEN array.close(); @@ -84,20 +93,28 @@ public void shouldHandleMultipleCallsToClose() throws Exception array.close(); } - private void swap( long[] expected, int fromIndex, int toIndex, int items ) + private void swap( long[] expected, int fromIndex, int toIndex ) { - for ( int i = 0; i < items; i++ ) - { - long fromValue = expected[fromIndex + i]; - expected[fromIndex + i] = expected[toIndex + i]; - expected[toIndex + i] = fromValue; - } + long fromValue = expected[fromIndex]; + expected[fromIndex] = expected[toIndex]; + expected[toIndex] = fromValue; } @Parameters - public static Collection data() + public static Collection data() throws IOException + { + fixture = prepareDirectoryAndPageCache( LongArrayTest.class ); + PageCache pageCache = fixture.pageCache; + File dir = fixture.directory; + NumberArrayFactory autoWithPageCacheFallback = autoWithPageCacheFallback( pageCache, dir ); + NumberArrayFactory pageCacheArrayFactory = new PageCachedNumberArrayFactory( pageCache, dir ); + return Arrays.asList( HEAP, OFF_HEAP, autoWithPageCacheFallback, pageCacheArrayFactory ); + } + + @AfterClass + public static void closeFixture() throws Exception { - return Arrays.asList( NumberArrayFactory.HEAP, NumberArrayFactory.OFF_HEAP ); + fixture.close(); } public LongArrayTest( NumberArrayFactory factory ) diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayPageCacheTestSupport.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayPageCacheTestSupport.java new file mode 100644 index 0000000000000..6cf0ef8934aee --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayPageCacheTestSupport.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.cache; + +import java.io.File; +import java.io.IOException; + +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.io.pagecache.impl.muninn.StandalonePageCacheFactory; +import org.neo4j.test.rule.TestDirectory; + +public class NumberArrayPageCacheTestSupport +{ + public static Fixture prepareDirectoryAndPageCache( Class testClass ) throws IOException + { + DefaultFileSystemAbstraction fileSystem = new DefaultFileSystemAbstraction(); + TestDirectory testDirectory = TestDirectory.testDirectory( testClass, fileSystem ); + File dir = testDirectory.prepareDirectoryForTest( "test" ); + fileSystem.mkdirs( dir ); + PageCache pageCache = StandalonePageCacheFactory.createPageCache( fileSystem ); + return new Fixture( pageCache, fileSystem, dir ); + } + + public static class Fixture implements AutoCloseable + { + public final PageCache pageCache; + public final FileSystemAbstraction fileSystem; + public final File directory; + + private Fixture( PageCache pageCache, FileSystemAbstraction fileSystem, File directory ) + { + this.pageCache = pageCache; + this.fileSystem = fileSystem; + this.directory = directory; + } + + @Override + public void close() throws Exception + { + pageCache.close(); + fileSystem.close(); + } + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayTest.java index 0b0066a9377e4..2b9cc3f9d21f0 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NumberArrayTest.java @@ -20,6 +20,7 @@ package org.neo4j.unsafe.impl.batchimport.cache; import org.junit.After; +import org.junit.AfterClass; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -27,24 +28,29 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.function.Function; +import org.neo4j.io.pagecache.PageCache; import org.neo4j.test.rule.RandomRule; import static org.junit.Assert.assertEquals; -import static org.neo4j.helpers.ArrayUtil.array; import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.CHUNKED_FIXED_SIZE; import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.HEAP; import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.OFF_HEAP; +import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.autoWithPageCacheFallback; @RunWith( Parameterized.class ) -public class NumberArrayTest +public class NumberArrayTest extends NumberArrayPageCacheTestSupport { + private static Fixture fixture; + @FunctionalInterface interface Writer> { @@ -59,40 +65,58 @@ interface Reader> private static final int INDEXES = 50_000; - @Parameters - public static Collection arrays() + @Parameters( name = "{0}" ) + public static Collection arrays() throws IOException { + fixture = prepareDirectoryAndPageCache( NumberArrayTest.class ); + PageCache pageCache = fixture.pageCache; + File dir = fixture.directory; Collection list = new ArrayList<>(); - for ( NumberArrayFactory factory : array( HEAP, OFF_HEAP, AUTO, CHUNKED_FIXED_SIZE ) ) + Map factories = new HashMap<>(); + factories.put( "HEAP", HEAP ); + factories.put( "OFF_HEAP", OFF_HEAP ); + factories.put( "AUTO", AUTO ); + factories.put( "CHUNKED_FIXED_SIZE", CHUNKED_FIXED_SIZE ); + factories.put( "autoWithPageCacheFallback", autoWithPageCacheFallback( pageCache, dir ) ); + factories.put( "PageCachedNumberArrayFactory", new PageCachedNumberArrayFactory( pageCache, dir ) ); + for ( Map.Entry entry : factories.entrySet() ) { + String name = entry.getKey() + " => "; + NumberArrayFactory factory = entry.getValue(); list.add( line( + name + "IntArray", factory.newIntArray( INDEXES, -1 ), (random) -> random.nextInt( 1_000_000_000 ), (array, index, value) -> array.set( index, (Integer) value ), (array, index) -> array.get( index ) ) ); list.add( line( + name + "DynamicIntArray", factory.newDynamicIntArray( INDEXES / 100, -1 ), (random) -> random.nextInt( 1_000_000_000 ), (array, index, value) -> array.set( index, (Integer) value ), (array, index) -> array.get( index ) ) ); list.add( line( + name + "LongArray", factory.newLongArray( INDEXES, -1 ), (random) -> random.nextLong( 1_000_000_000 ), (array, index, value) -> array.set( index, (Long) value ), (array, index) -> array.get( index ) ) ); list.add( line( + name + "DynamicLongArray", factory.newDynamicLongArray( INDEXES / 100, -1 ), (random) -> random.nextLong( 1_000_000_000 ), (array, index, value) -> array.set( index, (Long) value ), (array, index) -> array.get( index ) ) ); list.add( line( + name + "ByteArray", factory.newByteArray( INDEXES, new byte[] {-1, -1, -1, -1, -1} ), (random) -> random.nextInt( 1_000_000_000 ), (array, index, value) -> array.setInt( index, 1, (Integer) value ), (array, index) -> array.getInt( index, 1 ) ) ); list.add( line( + name + "DynamicByteArray", factory.newDynamicByteArray( INDEXES / 100, new byte[] {-1, -1, -1, -1, -1} ), (random) -> random.nextInt( 1_000_000_000 ), (array, index, value) -> array.setInt( index, 1, (Integer) value ), @@ -101,24 +125,32 @@ public static Collection arrays() return list; } - private static > Object[] line( N array, Function valueGenerator, + private static > Object[] line( String name, N array, Function valueGenerator, Writer writer, Reader reader ) { - return new Object[] {array, valueGenerator, writer, reader}; + return new Object[] {name, array, valueGenerator, writer, reader}; + } + + @AfterClass + public static void closeFixture() throws Exception + { + fixture.close(); } @Rule public RandomRule random = new RandomRule(); @Parameter( 0 ) - public NumberArray array; + public String name; @Parameter( 1 ) + public NumberArray array; + @Parameter( 2 ) public Function valueGenerator; @SuppressWarnings( "rawtypes" ) - @Parameter( 2 ) + @Parameter( 3 ) public Writer writer; @SuppressWarnings( "rawtypes" ) - @Parameter( 3 ) + @Parameter( 4 ) public Reader reader; @SuppressWarnings( "unchecked" ) @@ -142,7 +174,7 @@ public void shouldGetAndSetRandomItems() throws Exception for ( int index = 0; index < INDEXES; index++ ) { Object value = reader.read( index % 2 == 0 ? array : array.at( index ), index ); - Object expectedValue = key.containsKey( index ) ? key.get( index ) : defaultValue; + Object expectedValue = key.getOrDefault( index, defaultValue ); assertEquals( expectedValue, value ); } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheLongArrayTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheLongArrayTest.java index f213769bfeaf7..de9b2ddd7449b 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheLongArrayTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/PageCacheLongArrayTest.java @@ -23,81 +23,75 @@ import org.junit.Test; import org.junit.rules.RuleChain; -import java.nio.file.StandardOpenOption; +import java.io.File; import org.neo4j.io.pagecache.PageCache; -import org.neo4j.io.pagecache.PageSwapperFactory; -import org.neo4j.io.pagecache.impl.SingleFilePageSwapperFactory; -import org.neo4j.io.pagecache.impl.muninn.MuninnPageCache; -import org.neo4j.io.pagecache.tracing.PageCacheTracer; -import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracerSupplier; +import org.neo4j.io.pagecache.PagedFile; +import org.neo4j.test.rule.PageCacheRule; import org.neo4j.test.rule.RandomRule; import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.fs.DefaultFileSystemRule; -import static java.lang.System.currentTimeMillis; -import static org.neo4j.io.ByteUnit.mebiBytes; -import static org.neo4j.unsafe.impl.batchimport.cache.PageCacheNumberArray.PAGE_SIZE; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.DELETE_ON_CLOSE; +import static org.junit.Assert.assertEquals; public class PageCacheLongArrayTest { - private static final int COUNT = 100_000_000; + private static final int COUNT = 1_000_000; private final DefaultFileSystemRule fs = new DefaultFileSystemRule(); private final TestDirectory dir = TestDirectory.testDirectory(); private final RandomRule random = new RandomRule(); + private final PageCacheRule pageCacheRule = new PageCacheRule(); @Rule - public final RuleChain ruleChain = RuleChain.outerRule( fs ).around( dir ).around( random ); + public final RuleChain ruleChain = RuleChain.outerRule( fs ).around( dir ).around( random ).around( pageCacheRule ); @Test - public void shouldTest() throws Exception + public void verifyPageCacheLongArray() throws Exception { - PageSwapperFactory swapper = new SingleFilePageSwapperFactory(); - swapper.setFileSystemAbstraction( fs ); - int pageSize = (int) mebiBytes( 1 ); - try ( PageCache pageCache = new MuninnPageCache( swapper, 1_000, pageSize, PageCacheTracer.NULL, - PageCursorTracerSupplier.NULL ); - LongArray array = new PageCacheLongArray( pageCache.map( dir.file( "file" ), pageSize, - StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ, - StandardOpenOption.DELETE_ON_CLOSE ) ) ) + PageCache pageCache = pageCacheRule.getPageCache( fs ); + PagedFile file = pageCache.map( dir.file( "file" ), pageCache.pageSize(), CREATE, DELETE_ON_CLOSE ); + try ( LongArray array = new PageCacheLongArray( file, COUNT, 0, 0 ) ) { - test( array ); + verifyBehaviour( array ); } } @Test - public void shouldSKjdks() throws Exception + public void verifyChunkingArrayWithPageCacheLongArray() throws Exception { - LongArray array = NumberArrayFactory.AUTO.newDynamicLongArray( PAGE_SIZE / Long.BYTES, 0 ); - test( array ); + PageCache pageCache = pageCacheRule.getPageCache( fs ); + File directory = dir.directory(); + NumberArrayFactory numberArrayFactory = NumberArrayFactory.autoWithPageCacheFallback( pageCache, directory ); + try ( LongArray array = numberArrayFactory.newDynamicLongArray( COUNT / 1_000, 0 ) ) + { + verifyBehaviour( array ); + } } - private void test( LongArray array ) + private void verifyBehaviour( LongArray array ) { - long time = currentTimeMillis(); + // insert for ( int i = 0; i < COUNT; i++ ) { array.set( i, i ); } - long insertTime = currentTimeMillis() - time; - time = currentTimeMillis(); + + // verify inserted data for ( int i = 0; i < COUNT; i++ ) { - array.get( i ); + assertEquals( i, array.get( i ) ); } - long scanTime = currentTimeMillis() - time; + // verify inserted data with random access patterns int stride = 12_345_678; int next = random.nextInt( COUNT ); - time = currentTimeMillis(); for ( int i = 0; i < COUNT; i++ ) { - array.get( next ); + assertEquals( next, array.get( next ) ); next = (next + stride) % COUNT; } - long randomTime = currentTimeMillis() - time; - - System.out.println( "insert:" + insertTime + ", scan:" + scanTime + ", random:" + randomTime ); } } diff --git a/community/neo4j/src/test/java/schema/MultipleIndexPopulationStressIT.java b/community/neo4j/src/test/java/schema/MultipleIndexPopulationStressIT.java index ce3d08d076a41..b8e676160a4bd 100644 --- a/community/neo4j/src/test/java/schema/MultipleIndexPopulationStressIT.java +++ b/community/neo4j/src/test/java/schema/MultipleIndexPopulationStressIT.java @@ -62,6 +62,7 @@ import org.neo4j.unsafe.impl.batchimport.BatchImporter; import org.neo4j.unsafe.impl.batchimport.InputIterable; import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerators; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; @@ -384,7 +385,7 @@ public InputIterable nodes() } @Override - public IdMapper idMapper() + public IdMapper idMapper( NumberArrayFactory numberArrayFactory ) { return IdMappers.actual(); } diff --git a/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/checkpoint/NodeCountInputs.java b/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/checkpoint/NodeCountInputs.java index a8be271e7b761..5653b48dd9f06 100644 --- a/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/checkpoint/NodeCountInputs.java +++ b/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/checkpoint/NodeCountInputs.java @@ -21,6 +21,7 @@ import org.neo4j.unsafe.impl.batchimport.InputIterable; import org.neo4j.unsafe.impl.batchimport.InputIterator; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerators; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; @@ -103,7 +104,7 @@ public boolean supportsMultiplePasses() } @Override - public IdMapper idMapper() + public IdMapper idMapper( NumberArrayFactory numberArrayFactory ) { return IdMappers.actual(); }