From 1c79e85a0255a06fd83e7ba3f0a5c26340b5a1a1 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 11 Apr 2016 14:24:50 +0200 Subject: [PATCH] Make sure reserved ID is skipped in batch importer Parallel batch importer assigns/tracks record ids using it's own id sequence. Sequence is aware of the special reserved id IdGeneratorImpl.INTEGER_MINUS_ONE. However, PBI also reads batches of records directly from the store using raw ids. This code was not aware of the reserved id and tried to read and write records with such id. Doing this caused exceptions from the store layer. This commit makes sure records with reserved ids are never returned as part of a PBI's records batch. As a safety, measure it also makes sure record with reserved id is never written to the store. --- .../impl/batchimport/ReadNodeRecordsStep.java | 14 ++- .../ReadRelationshipRecordsBackwardsStep.java | 12 ++- .../impl/batchimport/UpdateRecordsStep.java | 16 ++-- .../batchimport/staging/ReadRecordsStep.java | 21 +++++ .../batchimport/ReadNodeRecordsStepTest.java | 63 +++++++++++++ ...dRelationshipRecordsBackwardsStepTest.java | 64 +++++++++++++ .../RelationshipLinkbackStageTest.java | 55 +++++++++++ .../impl/batchimport/StoreWithReservedId.java | 93 +++++++++++++++++++ .../batchimport/UpdateRecordsStepTest.java | 26 ++++++ 9 files changed, 353 insertions(+), 11 deletions(-) create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsStepTest.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStepTest.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/StoreWithReservedId.java diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsStep.java index ebe28727179ac..0fe87300a30c9 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsStep.java @@ -20,6 +20,7 @@ package org.neo4j.unsafe.impl.batchimport; import org.neo4j.kernel.impl.store.NodeStore; +import org.neo4j.kernel.impl.store.id.validation.IdValidator; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; @@ -28,7 +29,7 @@ /** * Reads from {@link NodeStore} and produces batches of {@link NodeRecord} for others to process. - * + *

* Future: Would be quite efficient just get a page cursor and read inUse+labelField and store * all labelField values of a batch in one long[] or similar, instead of passing on a NodeRecord[]. */ @@ -46,13 +47,20 @@ protected Object nextBatchOrNull( long ticket, int batchSize ) { int size = (int) min( batchSize, highId - id ); NodeRecord[] batch = new NodeRecord[size]; + boolean seenReservedId = false; + for ( int i = 0; i < size; i++ ) { // We don't want null in batch[i], a record, whether used or unused is what we want cursor.next( id++ ); - batch[i] = record.clone(); + NodeRecord newRecord = record.clone(); + batch[i] = newRecord; + seenReservedId |= IdValidator.isReservedId( newRecord.getId() ); } - return size > 0 ? batch : null; + + batch = removeRecordWithReservedId( batch, seenReservedId ); + + return batch.length > 0 ? batch : null; } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStep.java index a5bd3469bf031..6e1793d076ac0 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStep.java @@ -20,6 +20,7 @@ package org.neo4j.unsafe.impl.batchimport; import org.neo4j.kernel.impl.store.RelationshipStore; +import org.neo4j.kernel.impl.store.id.validation.IdValidator; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; @@ -46,12 +47,19 @@ protected Object nextBatchOrNull( long ticket, int batchSize ) { int size = (int) min( batchSize, id ); RelationshipRecord[] batch = new RelationshipRecord[size]; + boolean seenReservedId = false; + for ( int i = 0; i < size; i++ ) { cursor.next( --id ); - batch[i] = record.clone(); + RelationshipRecord newRecord = record.clone(); + batch[i] = newRecord; + seenReservedId |= IdValidator.isReservedId( newRecord.getId() ); } - return size > 0 ? batch : null; + + batch = removeRecordWithReservedId( batch, seenReservedId ); + + return batch.length > 0 ? batch : null; } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java index d4c0fb90bc3e1..9741feb6ae555 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java @@ -22,6 +22,7 @@ import java.util.Collection; import org.neo4j.kernel.impl.store.RecordStore; +import org.neo4j.kernel.impl.store.id.validation.IdValidator; import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; @@ -32,8 +33,8 @@ import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; /** - * Updates a batch of records to a store. Can be given a {@link Predicate} that can choose to not - * {@link Predicate#accept(Object) accept} a record, which will have that record be written as unused instead. + * Updates a batch of records to a store. Can have {@link #accept(AbstractBaseRecord)} overwritten to not not accept + * a record, which will have that record be written as unused instead. */ public class UpdateRecordsStep extends ProcessorStep @@ -56,12 +57,15 @@ protected void process( RECORD[] batch, BatchSender sender ) throws Throwable { for ( RECORD record : batch ) { - if ( record.inUse() && !accept( record ) ) + if ( !IdValidator.isReservedId( record.getId() ) ) { - record = (RECORD) record.clone(); - record.setInUse( false ); + if ( record.inUse() && !accept( record ) ) + { + record = (RECORD) record.clone(); + record.setInUse( false ); + } + update( record ); } - update( record ); } recordsUpdated += batch.length; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java index 3624861739c27..021e5218827c4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java @@ -19,8 +19,12 @@ */ package org.neo4j.unsafe.impl.batchimport.staging; +import java.lang.reflect.Array; +import java.util.stream.Stream; + import org.neo4j.kernel.impl.store.RecordCursor; import org.neo4j.kernel.impl.store.RecordStore; +import org.neo4j.kernel.impl.store.id.validation.IdValidator; import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK; @@ -60,4 +64,21 @@ public void close() throws Exception super.close(); cursor.close(); } + + protected RECORD[] removeRecordWithReservedId( RECORD[] records, boolean seenReservedId ) + { + if ( !seenReservedId ) + { + return records; + } + return Stream.of( records ) + .filter( record -> !IdValidator.isReservedId( record.getId() ) ) + .toArray( length -> newArray( length, records.getClass().getComponentType() ) ); + } + + @SuppressWarnings( "unchecked" ) + private RECORD[] newArray( int length, Class componentType ) + { + return (RECORD[]) Array.newInstance( componentType, length ); + } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsStepTest.java new file mode 100644 index 0000000000000..eb805bd8a6913 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsStepTest.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.neo4j.kernel.impl.store.NodeStore; +import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; +import org.neo4j.kernel.impl.store.record.NodeRecord; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ReadNodeRecordsStepTest +{ + @Test + public void reservedIdIsSkipped() + { + long highId = 5; + int batchSize = (int) highId; + NodeStore store = StoreWithReservedId.newNodeStoreMock( highId ); + when( store.getHighId() ).thenReturn( highId ); + + ReadNodeRecordsStep step = new ReadNodeRecordsStep( mock( StageControl.class ), Configuration.DEFAULT, store ); + + Object batch = step.nextBatchOrNull( 0, batchSize ); + + assertNotNull( batch ); + + NodeRecord[] records = (NodeRecord[]) batch; + boolean hasRecordWithReservedId = Stream.of( records ).anyMatch( recordWithReservedId() ); + assertFalse( "Batch contains record with reserved id " + Arrays.toString( records ), hasRecordWithReservedId ); + } + + private static Predicate recordWithReservedId() + { + return record -> record.getId() == IdGeneratorImpl.INTEGER_MINUS_ONE; + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStepTest.java new file mode 100644 index 0000000000000..e03c3b03ea4e7 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStepTest.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.neo4j.kernel.impl.store.RelationshipStore; +import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ReadRelationshipRecordsBackwardsStepTest +{ + @Test + public void reservedIdIsSkipped() + { + long highId = 5; + int batchSize = (int) highId; + RelationshipStore store = StoreWithReservedId.newRelationshipStoreMock( highId ); + when( store.getHighId() ).thenReturn( highId ); + + ReadRelationshipRecordsBackwardsStep step = new ReadRelationshipRecordsBackwardsStep( + mock( StageControl.class ), Configuration.DEFAULT, store ); + + Object batch = step.nextBatchOrNull( 0, batchSize ); + + assertNotNull( batch ); + + RelationshipRecord[] records = (RelationshipRecord[]) batch; + boolean hasRecordWithReservedId = Stream.of( records ).anyMatch( recordWithReservedId() ); + assertFalse( "Batch contains record with reserved id " + Arrays.toString( records ), hasRecordWithReservedId ); + } + + private static Predicate recordWithReservedId() + { + return record -> record.getId() == IdGeneratorImpl.INTEGER_MINUS_ONE; + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java new file mode 100644 index 0000000000000..ce6d45ad3d6de --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.junit.Test; + +import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.kernel.impl.store.RelationshipStore; +import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; +import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors; +import org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +public class RelationshipLinkbackStageTest +{ + @Test + public void reservedIdIsSkipped() throws Exception + { + long highId = 5; + RelationshipStore store = StoreWithReservedId.newRelationshipStoreMock( highId ); + RelationshipLinkbackStage stage = new RelationshipLinkbackStage( Configuration.DEFAULT, store, newCache() ); + + ExecutionSupervisors.superviseExecution( ExecutionMonitors.invisible(), Configuration.DEFAULT, stage ); + + verify( store, never() ).updateRecord( new RelationshipRecord( IdGeneratorImpl.INTEGER_MINUS_ONE ) ); + } + + private static NodeRelationshipCache newCache() + { + int denseNodeThreshold = Integer.parseInt( GraphDatabaseSettings.dense_node_threshold.getDefaultValue() ); + return new NodeRelationshipCache( NumberArrayFactory.HEAP, denseNodeThreshold ); + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/StoreWithReservedId.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/StoreWithReservedId.java new file mode 100644 index 0000000000000..6e234205f1eb3 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/StoreWithReservedId.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.neo4j.kernel.impl.store.NodeStore; +import org.neo4j.kernel.impl.store.RecordCursor; +import org.neo4j.kernel.impl.store.RecordStore; +import org.neo4j.kernel.impl.store.RelationshipStore; +import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; +import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; +import org.neo4j.kernel.impl.store.record.NodeRecord; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public final class StoreWithReservedId +{ + private StoreWithReservedId() + { + } + + /** + * Create new {@link NodeStore} mock with {@link RecordCursor} that returns record with + * reserved id - {@link IdGeneratorImpl#INTEGER_MINUS_ONE}. + * + * @param highId the highId for the store mock + * @return new {@link NodeStore} mock + */ + public static NodeStore newNodeStoreMock( long highId ) + { + return newStoreMock( NodeStore.class, new NodeRecord( -1 ), highId ); + } + + /** + * Create new {@link RelationshipStore} mock with {@link RecordCursor} that returns record with + * reserved id - {@link IdGeneratorImpl#INTEGER_MINUS_ONE}. + * + * @param highId the highId for the store mock + * @return new {@link RelationshipStore} mock + */ + public static RelationshipStore newRelationshipStoreMock( long highId ) + { + return newStoreMock( RelationshipStore.class, new RelationshipRecord( -1 ), highId ); + } + + private static > S newStoreMock( Class storeClass, + R record, long highId ) + { + S store = mock( storeClass ); + when( store.getHighId() ).thenReturn( highId ); + + when( store.newRecord() ).thenReturn( record ); + + RecordCursor cursor = newReservedIdReturningRecordCursor( highId, record ); + when( store.newRecordCursor( any() ) ).thenReturn( cursor ); + + return store; + } + + @SuppressWarnings( "unchecked" ) + private static RecordCursor newReservedIdReturningRecordCursor( long highId, + R record ) + { + RecordCursor cursor = mock( RecordCursor.class ); + when( cursor.next( anyInt() ) ).thenAnswer( invocation -> { + long id = (long) invocation.getArguments()[0]; + long realId = (id == highId - 1) ? IdGeneratorImpl.INTEGER_MINUS_ONE : id; + record.setId( realId ); + return true; + } ); + return cursor; + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStepTest.java index a841d1035951a..8d88df33e8e77 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStepTest.java @@ -23,7 +23,9 @@ import java.util.Arrays; +import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.kernel.impl.store.RecordStore; +import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; @@ -33,6 +35,8 @@ import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class UpdateRecordsStepTest @@ -59,4 +63,26 @@ public void ioThroughputStatDoesNotOverflow() throws Throwable assertThat( stat.asLong(), greaterThan( 0L ) ); } + + @Test + public void recordWithReservedIdIsSkipped() throws Throwable + { + RecordStore store = mock( NodeStore.class ); + StageControl stageControl = mock( StageControl.class ); + UpdateRecordsStep step = new UpdateRecordsStep<>( stageControl, Configuration.DEFAULT, store ); + + NodeRecord node1 = new NodeRecord( 1 ); + NodeRecord node2 = new NodeRecord( 2 ); + NodeRecord nodeWithReservedId = new NodeRecord( IdGeneratorImpl.INTEGER_MINUS_ONE ); + NodeRecord[] batch = {node1, node2, nodeWithReservedId}; + + step.process( batch, mock( BatchSender.class ) ); + + verify( store ).prepareForCommit( node1 ); + verify( store ).updateRecord( node1 ); + verify( store ).prepareForCommit( node2 ); + verify( store ).updateRecord( node2 ); + verify( store, never() ).prepareForCommit( nodeWithReservedId ); + verify( store, never() ).updateRecord( nodeWithReservedId ); + } }