diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java index 8c078d96d55ec..4f00e581206cc 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java @@ -21,6 +21,7 @@ import org.neo4j.kernel.impl.store.RecordStore; import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; +import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; @@ -42,8 +43,8 @@ public CountGroupsStage( Configuration config, RecordStore( control(), config, store, allIn( store, config ) ) ); + add( new BatchFeedStep( control(), config, allIn( store, config ), store.getRecordSize() ) ); + add( new ReadRecordsStep<>( control(), config, store ) ); add( new CountGroupsStep( control(), config, groupCache ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java index 172cb8eb01c8b..c63aace407957 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java @@ -22,6 +22,7 @@ import org.neo4j.kernel.impl.api.CountsAccessor; import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache; +import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; @@ -38,7 +39,8 @@ public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore n int highLabelId, CountsAccessor.Updater countsUpdater, StatsProvider... additionalStatsProviders ) { super( "Node counts", config ); - add( new ReadRecordsStep<>( control(), config, nodeStore, allIn( nodeStore, config ) ) ); + add( new BatchFeedStep( control(), config, allIn( nodeStore, config ), nodeStore.getRecordSize() ) ); + add( new ReadRecordsStep<>( control(), config, nodeStore ) ); add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor( nodeStore, cache, highLabelId, countsUpdater ), true, additionalStatsProviders ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java index 18932f55792ef..87108011f42a4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java @@ -23,6 +23,7 @@ import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; import org.neo4j.unsafe.impl.batchimport.cache.ByteArray; +import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; @@ -38,7 +39,8 @@ public NodeFirstGroupStage( Configuration config, RecordStore nodeStore, ByteArray cache ) { super( "Node --> Group", config ); - add( new ReadRecordsStep<>( control(), config, groupStore, allIn( groupStore, config ) ) ); + add( new BatchFeedStep( control(), config, allIn( groupStore, config ), groupStore.getRecordSize() ) ); + add( new ReadRecordsStep<>( control(), config, groupStore ) ); add( new NodeSetFirstGroupStep( control(), config, nodeStore, cache ) ); add( new UpdateRecordsStep<>( control(), config, nodeStore ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ProcessRelationshipCountsDataStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ProcessRelationshipCountsDataStep.java index 8ada2a428e193..00cbf2ef8bd9a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ProcessRelationshipCountsDataStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ProcessRelationshipCountsDataStep.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.neo4j.kernel.impl.api.CountsAccessor; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache; import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; @@ -30,10 +31,10 @@ import org.neo4j.unsafe.impl.batchimport.staging.StageControl; /** - * Processes relationship count data received from {@link ReadRelationshipCountsDataStep} and keeps - * the accumulated counts per thread. Aggregated when {@link #done()}. + * Processes relationship records, feeding them to {@link RelationshipCountsProcessor} which keeps + * the accumulated counts per thread. Aggregated in {@link #done()}. */ -public class ProcessRelationshipCountsDataStep extends ProcessorStep +public class ProcessRelationshipCountsDataStep extends ProcessorStep { private final NodeLabelsCache cache; private final Map processors = new ConcurrentHashMap<>(); @@ -55,12 +56,13 @@ public ProcessRelationshipCountsDataStep( StageControl control, NodeLabelsCache } @Override - protected void process( long[] batch, BatchSender sender ) + protected void process( RelationshipRecord[] batch, BatchSender sender ) { RelationshipCountsProcessor processor = processor(); for ( int i = 0; i < batch.length; i++ ) { - processor.process( batch[i++], (int)batch[i++], batch[i] ); + RelationshipRecord relationship = batch[i]; + processor.process( relationship.getFirstNode(), relationship.getType(), relationship.getSecondNode() ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipCountsDataStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipCountsDataStep.java deleted file mode 100644 index 00f75cb95d627..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipCountsDataStep.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import java.util.Arrays; - -import org.neo4j.kernel.impl.store.RelationshipStore; -import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; -import org.neo4j.unsafe.impl.batchimport.staging.StageControl; - -import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; - -/** - * Reads from {@link RelationshipStore} and produces batches of startNode,type,endNode values for - * others to process. The result is one long[] with all values in. - */ -public class ReadRelationshipCountsDataStep extends ReadRecordsStep -{ - private long id = -1; - private final long highestId; - - public ReadRelationshipCountsDataStep( StageControl control, Configuration config, RelationshipStore store ) - { - super( control, config, store, allIn( store, config ) ); - this.highestId = highId - 1; - } - - @Override - protected long[] nextBatchOrNull( long ticket, int batchSize ) - { - if ( id >= highestId ) - { - return null; - } - - long[] batch = new long[batchSize * 3]; // start node, type, end node = 3 - int i = 0; - for ( ; i < batchSize && ++id <= highestId; ) - { - if ( cursor.next( id ) ) - { - int index = i++ * 3; - batch[index++] = record.getFirstNode(); - batch[index++] = record.getType(); - batch[index++] = record.getSecondNode(); - } - } - return i == batchSize ? batch : Arrays.copyOf( batch, i * 3 ); - } - - @Override - protected long position() - { - return doneBatches.get() * batchSize * store.getRecordSize(); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java index f397bd7a5a629..bb9af90141ab4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java @@ -23,20 +23,27 @@ import org.neo4j.kernel.impl.store.RelationshipStore; import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache; import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; +import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; +import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; +import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; + /** * Reads all records from {@link RelationshipStore} and process the counts in them. Uses a {@link NodeLabelsCache} * previously populated by f.ex {@link NodeCountsStage}. */ public class RelationshipCountsStage extends Stage { - public RelationshipCountsStage( Configuration config, NodeLabelsCache cache, RelationshipStore relationshipStore, + public RelationshipCountsStage( Configuration config, NodeLabelsCache cache, + RelationshipStore relationshipStore, int highLabelId, int highRelationshipTypeId, CountsAccessor.Updater countsUpdater, NumberArrayFactory cacheFactory ) { super( "Relationship counts", config ); - add( new ReadRelationshipCountsDataStep( control(), config, relationshipStore ) ); + add( new BatchFeedStep( control(), config, allIn( relationshipStore, config ), + relationshipStore.getRecordSize() ) ); + add( new ReadRecordsStep<>( control(), config, relationshipStore ) ); add( new ProcessRelationshipCountsDataStep( control(), cache, config, highLabelId, highRelationshipTypeId, countsUpdater, cacheFactory ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java index cfdc4a7b7ac78..6cb182aa42e5e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java @@ -22,8 +22,10 @@ import org.neo4j.kernel.impl.store.RelationshipStore; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; + import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.backwards; /** @@ -48,8 +50,9 @@ public RelationshipLinkbackStage( String topic, Configuration config, Relationsh NodeRelationshipCache cache, long lowRelationshipId, long highRelationshipId, int nodeTypes ) { super( "Relationship --> Relationship" + topic, config ); - add( new ReadRecordsStep<>( control(), config, store, - backwards( lowRelationshipId, highRelationshipId, config ) ) ); + add( new BatchFeedStep( control(), config, backwards( lowRelationshipId, highRelationshipId, config ), + store.getRecordSize()) ); + add( new ReadRecordsStep<>( control(), config, store ) ); add( new RelationshipLinkbackStep( control(), config, cache, nodeTypes ) ); add( new UpdateRecordsStep<>( control(), config, store ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java index 508fbd8b15d39..c02b86a5a1724 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java @@ -21,6 +21,7 @@ import org.neo4j.kernel.impl.store.RecordStore; import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; +import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; @@ -41,7 +42,8 @@ public ScanAndCacheGroupsStage( Configuration config, RecordStore( control(), config, store, allInReversed( store, config ) ) ); + add( new BatchFeedStep( control(), config, allInReversed( store, config ), store.getRecordSize() ) ); + add( new ReadRecordsStep<>( control(), config, store ) ); add( new CacheGroupsStep( control(), config, cache ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/BatchFeedStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/BatchFeedStep.java new file mode 100644 index 0000000000000..3020edc3c090b --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/BatchFeedStep.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.staging; + +import org.neo4j.unsafe.impl.batchimport.Configuration; +import org.neo4j.unsafe.impl.batchimport.RecordIdIterator; + +/** + * Releases batches of record ids to be read, potentially in parallel, by downstream batches. + */ +public class BatchFeedStep extends IoProducerStep +{ + private final RecordIdIterator ids; + private final int recordSize; + private volatile long count; + + public BatchFeedStep( StageControl control, Configuration config, RecordIdIterator ids, int recordSize ) + { + super( control, config ); + this.ids = ids; + this.recordSize = recordSize; + } + + @Override + protected Object nextBatchOrNull( long ticket, int batchSize ) + { + count += batchSize; + return ids.nextBatch(); + } + + @Override + protected long position() + { + return count * recordSize; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java index 7a3f419e7b93e..44ffba5611185 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java @@ -27,10 +27,8 @@ import org.neo4j.kernel.impl.store.RecordStore; import org.neo4j.kernel.impl.store.id.validation.IdValidator; import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; +import org.neo4j.kernel.impl.store.record.RecordLoad; import org.neo4j.unsafe.impl.batchimport.Configuration; -import org.neo4j.unsafe.impl.batchimport.RecordIdIterator; - -import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK; /** * Reads records from a {@link RecordStore} and sends batches of those records downstream. @@ -39,74 +37,55 @@ * * @param type of {@link AbstractBaseRecord} */ -public class ReadRecordsStep extends IoProducerStep +public class ReadRecordsStep extends ProcessorStep { protected final RecordStore store; - protected final RECORD record; - protected final RecordCursor cursor; - protected final long highId; - private final RecordIdIterator ids; private final Class klass; - private final int recordSize; - // volatile since written by processing threads and read by execution monitor - private volatile long count; + protected final int batchSize; @SuppressWarnings( "unchecked" ) - public ReadRecordsStep( StageControl control, Configuration config, RecordStore store, - RecordIdIterator ids ) + public ReadRecordsStep( StageControl control, Configuration config, RecordStore store ) { - super( control, config ); + super( control, ">", config, 0 ); this.store = store; - this.ids = ids; this.klass = (Class) store.newRecord().getClass(); - this.recordSize = store.getRecordSize(); - this.cursor = store.newRecordCursor( record = store.newRecord() ); - this.highId = store.getHighId(); + this.batchSize = config.batchSize(); } @Override public void start( int orderingGuarantees ) { - cursor.acquire( 0, CHECK ); - super.start( orderingGuarantees ); + super.start( orderingGuarantees | ORDER_SEND_DOWNSTREAM ); } - @SuppressWarnings( "unchecked" ) @Override - protected Object nextBatchOrNull( long ticket, int batchSize ) + protected void process( PrimitiveLongIterator idRange, BatchSender sender ) throws Throwable { - PrimitiveLongIterator ids; - while ( (ids = this.ids.nextBatch()) != null ) + if ( !idRange.hasNext() ) + { + return; + } + + long id = idRange.next(); + RECORD record = store.newRecord(); + RECORD[] batch = (RECORD[]) Array.newInstance( klass, batchSize ); + int i = 0; + try ( RecordCursor cursor = store.newRecordCursor( record ).acquire( id, RecordLoad.CHECK ) ) { - RECORD[] batch = (RECORD[]) Array.newInstance( klass, batchSize ); - int i = 0; - while ( ids.hasNext() ) + boolean hasNext = true; + while ( hasNext ) { - if ( cursor.next( ids.next() ) && !IdValidator.isReservedId( record.getId() ) ) + if ( cursor.next( id ) && !IdValidator.isReservedId( id ) ) { batch[i++] = (RECORD) record.clone(); } - } - - if ( i > 0 ) - { - count += i; - return i == batchSize ? batch : Arrays.copyOf( batch, i ); + if ( hasNext = idRange.hasNext() ) + { + id = idRange.next(); + } } } - return null; - } - - @Override - public void close() throws Exception - { - super.close(); - cursor.close(); - } - @Override - protected long position() - { - return count * recordSize; + sender.send( i == batchSize ? batch : Arrays.copyOf( batch, i ) ); } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/StoreWithReservedId.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/StoreWithReservedId.java index 1b40ef7a3b27a..2224caec98462 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/StoreWithReservedId.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/StoreWithReservedId.java @@ -89,6 +89,7 @@ private static RecordCursor newReservedIdRetur record.setId( realId ); return true; } ); + when( cursor.acquire( anyInt(), any() ) ).thenReturn( cursor ); return cursor; } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStepTest.java deleted file mode 100644 index 5413c308b6540..0000000000000 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStepTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport.staging; - -import org.junit.Test; - -import java.util.Arrays; -import java.util.function.Predicate; -import java.util.stream.Stream; - -import org.neo4j.kernel.impl.store.NodeStore; -import org.neo4j.kernel.impl.store.RecordCursor; -import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; -import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; -import org.neo4j.kernel.impl.store.record.NodeRecord; -import org.neo4j.kernel.impl.store.record.RecordLoad; -import org.neo4j.unsafe.impl.batchimport.StoreWithReservedId; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT; -import static org.neo4j.unsafe.impl.batchimport.Configuration.withBatchSize; -import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; - -public class ReadRecordsStepTest -{ - @Test - public void reservedIdIsSkipped() - { - long highId = 5; - int batchSize = (int) highId; - org.neo4j.unsafe.impl.batchimport.Configuration config = withBatchSize( DEFAULT, batchSize ); - NodeStore store = StoreWithReservedId.newNodeStoreMock( highId ); - when( store.getHighId() ).thenReturn( highId ); - when( store.getRecordsPerPage() ).thenReturn( 10 ); - - ReadRecordsStep step = new ReadRecordsStep<>( mock( StageControl.class ), config, - store, allIn( store, config ) ); - step.start( 0 ); - - Object batch = step.nextBatchOrNull( 0, batchSize ); - - assertNotNull( batch ); - - NodeRecord[] records = (NodeRecord[]) batch; - boolean hasRecordWithReservedId = Stream.of( records ).anyMatch( recordWithReservedId() ); - assertFalse( "Batch contains record with reserved id " + Arrays.toString( records ), hasRecordWithReservedId ); - } - - @Test - public void shouldContinueThroughBigIdHoles() throws Exception - { - // GIVEN - NodeStore store = mock( NodeStore.class ); - long highId = 100L; - when( store.getHighId() ).thenReturn( highId ); - when( store.newRecord() ).thenReturn( new NodeRecord( -1 ) ); - org.neo4j.unsafe.impl.batchimport.Configuration config = withBatchSize( DEFAULT, 10 ); - when( store.newRecordCursor( any( NodeRecord.class ) ) ).thenAnswer( invocation -> - { - return new ControlledRecordCursor<>( (NodeRecord) invocation.getArguments()[0], record -> - { - record.setInUse( - record.getId() < config.batchSize() || record.getId() >= highId - config.batchSize() / 2 ); - return record.inUse() && record.getId() < highId; - } ); - } ); - ReadRecordsStep step = new ReadRecordsStep<>( mock( StageControl.class ), - config, store, allIn( store, config ) ); - step.start( 0 ); - - // WHEN - NodeRecord[] first = (NodeRecord[]) step.nextBatchOrNull( 0, config.batchSize() ); - NodeRecord[] second = (NodeRecord[]) step.nextBatchOrNull( 1, config.batchSize() ); - NodeRecord[] third = (NodeRecord[]) step.nextBatchOrNull( 2, config.batchSize() ); - - // THEN - assertEquals( config.batchSize(), first.length ); - assertEquals( 0L, first[0].getId() ); - assertEquals( first[0].getId() + config.batchSize() - 1, first[first.length - 1].getId() ); - - assertEquals( config.batchSize() / 2, second.length ); - assertEquals( highId - 1, second[second.length - 1].getId() ); - - assertNull( third ); - } - - private static Predicate recordWithReservedId() - { - return record -> record.getId() == IdGeneratorImpl.INTEGER_MINUS_ONE; - } - - private static class ControlledRecordCursor implements RecordCursor - { - private final RECORD record; - private final Predicate populator; - - ControlledRecordCursor( RECORD record, Predicate populator ) - { - this.record = record; - this.populator = populator; - } - - @Override - public void close() - { // Nothing to close - } - - @Override - public RECORD get() - { - return record; - } - - @Override - public RecordCursor acquire( long id, RecordLoad mode ) - { - return this; - } - - @Override - public void placeAt( long id, RecordLoad mode ) - { // Don't care about this, we only care about next(id) in this class - } - - @Override - public boolean next() - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean next( long id ) - { - record.setId( id ); - return populator.test( record ); - } - - @Override - public boolean next( long id, RECORD record, RecordLoad mode ) - { - throw new UnsupportedOperationException(); - } - } -} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/store/BatchingIdSequenceTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/store/BatchingIdSequenceTest.java index a4354bb95b0ce..34d1c5a490c5c 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/store/BatchingIdSequenceTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/store/BatchingIdSequenceTest.java @@ -22,12 +22,14 @@ import org.junit.Test; import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; +import org.neo4j.kernel.impl.store.id.IdRange; +import org.neo4j.kernel.impl.store.id.validation.IdValidator; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; public class BatchingIdSequenceTest { - @Test public void ShouldSkipNullId() throws Exception { @@ -67,4 +69,32 @@ public void resetShouldSetDefault() throws Exception assertEquals( 0L, idSequence.nextId() ); assertEquals( 1L, idSequence.peek() ); } + + @Test + public void shouldSkipReservedIdWhenGettingBatches() throws Exception + { + // GIVEN + int batchSize = 10; + BatchingIdSequence idSequence = new BatchingIdSequence( + IdGeneratorImpl.INTEGER_MINUS_ONE - batchSize - batchSize/2 ); + + // WHEN + IdRange range1 = idSequence.nextIdBatch( batchSize ); + IdRange range2 = idSequence.nextIdBatch( batchSize ); + + // THEN + assertNoReservedId( range1 ); + assertNoReservedId( range2 ); + } + + private void assertNoReservedId( IdRange range ) + { + for ( long id : range.getDefragIds() ) + { + assertFalse( IdValidator.isReservedId( id ) ); + } + + assertFalse( IdValidator.hasReservedIdInRange( range.getRangeStart(), + range.getRangeStart() + range.getRangeLength() ) ); + } }