Skip to content

Commit

Permalink
Make sure reserved ID is skipped in batch importer
Browse files Browse the repository at this point in the history
Parallel batch importer assigns/tracks record ids using it's own id sequence.
Sequence is aware of the special reserved id IdGeneratorImpl.INTEGER_MINUS_ONE.
However, PBI also reads batches of records directly from the store using raw
ids. This code was not aware of the reserved id and tried to read and write
records with such id. Doing this caused exceptions from the store layer.

This commit makes sure records with reserved ids are never returned as part of
a PBI's records batch. As a safety, measure it also makes sure record with
reserved id is never written to the store.
  • Loading branch information
lutovich committed Apr 11, 2016
1 parent 1e1563a commit 1c79e85
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 11 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.id.validation.IdValidator;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
Expand All @@ -28,7 +29,7 @@

/**
* Reads from {@link NodeStore} and produces batches of {@link NodeRecord} for others to process.
*
* <p>
* Future: Would be quite efficient just get a page cursor and read inUse+labelField and store
* all labelField values of a batch in one long[] or similar, instead of passing on a NodeRecord[].
*/
Expand All @@ -46,13 +47,20 @@ protected Object nextBatchOrNull( long ticket, int batchSize )
{
int size = (int) min( batchSize, highId - id );
NodeRecord[] batch = new NodeRecord[size];
boolean seenReservedId = false;

for ( int i = 0; i < size; i++ )
{
// We don't want null in batch[i], a record, whether used or unused is what we want
cursor.next( id++ );
batch[i] = record.clone();
NodeRecord newRecord = record.clone();
batch[i] = newRecord;
seenReservedId |= IdValidator.isReservedId( newRecord.getId() );
}
return size > 0 ? batch : null;

batch = removeRecordWithReservedId( batch, seenReservedId );

return batch.length > 0 ? batch : null;
}

@Override
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.id.validation.IdValidator;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
Expand All @@ -46,12 +47,19 @@ protected Object nextBatchOrNull( long ticket, int batchSize )
{
int size = (int) min( batchSize, id );
RelationshipRecord[] batch = new RelationshipRecord[size];
boolean seenReservedId = false;

for ( int i = 0; i < size; i++ )
{
cursor.next( --id );
batch[i] = record.clone();
RelationshipRecord newRecord = record.clone();
batch[i] = newRecord;
seenReservedId |= IdValidator.isReservedId( newRecord.getId() );
}
return size > 0 ? batch : null;

batch = removeRecordWithReservedId( batch, seenReservedId );

return batch.length > 0 ? batch : null;
}

@Override
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;

import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.id.validation.IdValidator;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
Expand All @@ -32,8 +33,8 @@
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

/**
* Updates a batch of records to a store. Can be given a {@link Predicate} that can choose to not
* {@link Predicate#accept(Object) accept} a record, which will have that record be written as unused instead.
* Updates a batch of records to a store. Can have {@link #accept(AbstractBaseRecord)} overwritten to not not accept
* a record, which will have that record be written as unused instead.
*/
public class UpdateRecordsStep<RECORD extends AbstractBaseRecord>
extends ProcessorStep<RECORD[]>
Expand All @@ -56,12 +57,15 @@ protected void process( RECORD[] batch, BatchSender sender ) throws Throwable
{
for ( RECORD record : batch )
{
if ( record.inUse() && !accept( record ) )
if ( !IdValidator.isReservedId( record.getId() ) )
{
record = (RECORD) record.clone();
record.setInUse( false );
if ( record.inUse() && !accept( record ) )
{
record = (RECORD) record.clone();
record.setInUse( false );
}
update( record );
}
update( record );
}
recordsUpdated += batch.length;
}
Expand Down
Expand Up @@ -19,8 +19,12 @@
*/
package org.neo4j.unsafe.impl.batchimport.staging;

import java.lang.reflect.Array;
import java.util.stream.Stream;

import org.neo4j.kernel.impl.store.RecordCursor;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.id.validation.IdValidator;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;

import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK;
Expand Down Expand Up @@ -60,4 +64,21 @@ public void close() throws Exception
super.close();
cursor.close();
}

protected RECORD[] removeRecordWithReservedId( RECORD[] records, boolean seenReservedId )
{
if ( !seenReservedId )
{
return records;
}
return Stream.of( records )
.filter( record -> !IdValidator.isReservedId( record.getId() ) )
.toArray( length -> newArray( length, records.getClass().getComponentType() ) );
}

@SuppressWarnings( "unchecked" )
private RECORD[] newArray( int length, Class<?> componentType )
{
return (RECORD[]) Array.newInstance( componentType, length );
}
}
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import org.junit.Test;

import java.util.Arrays;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ReadNodeRecordsStepTest
{
@Test
public void reservedIdIsSkipped()
{
long highId = 5;
int batchSize = (int) highId;
NodeStore store = StoreWithReservedId.newNodeStoreMock( highId );
when( store.getHighId() ).thenReturn( highId );

ReadNodeRecordsStep step = new ReadNodeRecordsStep( mock( StageControl.class ), Configuration.DEFAULT, store );

Object batch = step.nextBatchOrNull( 0, batchSize );

assertNotNull( batch );

NodeRecord[] records = (NodeRecord[]) batch;
boolean hasRecordWithReservedId = Stream.of( records ).anyMatch( recordWithReservedId() );
assertFalse( "Batch contains record with reserved id " + Arrays.toString( records ), hasRecordWithReservedId );
}

private static Predicate<NodeRecord> recordWithReservedId()
{
return record -> record.getId() == IdGeneratorImpl.INTEGER_MINUS_ONE;
}
}
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import org.junit.Test;

import java.util.Arrays;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ReadRelationshipRecordsBackwardsStepTest
{
@Test
public void reservedIdIsSkipped()
{
long highId = 5;
int batchSize = (int) highId;
RelationshipStore store = StoreWithReservedId.newRelationshipStoreMock( highId );
when( store.getHighId() ).thenReturn( highId );

ReadRelationshipRecordsBackwardsStep step = new ReadRelationshipRecordsBackwardsStep(
mock( StageControl.class ), Configuration.DEFAULT, store );

Object batch = step.nextBatchOrNull( 0, batchSize );

assertNotNull( batch );

RelationshipRecord[] records = (RelationshipRecord[]) batch;
boolean hasRecordWithReservedId = Stream.of( records ).anyMatch( recordWithReservedId() );
assertFalse( "Batch contains record with reserved id " + Arrays.toString( records ), hasRecordWithReservedId );
}

private static Predicate<RelationshipRecord> recordWithReservedId()
{
return record -> record.getId() == IdGeneratorImpl.INTEGER_MINUS_ONE;
}
}
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import org.junit.Test;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors;

import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

public class RelationshipLinkbackStageTest
{
@Test
public void reservedIdIsSkipped() throws Exception
{
long highId = 5;
RelationshipStore store = StoreWithReservedId.newRelationshipStoreMock( highId );
RelationshipLinkbackStage stage = new RelationshipLinkbackStage( Configuration.DEFAULT, store, newCache() );

ExecutionSupervisors.superviseExecution( ExecutionMonitors.invisible(), Configuration.DEFAULT, stage );

verify( store, never() ).updateRecord( new RelationshipRecord( IdGeneratorImpl.INTEGER_MINUS_ONE ) );
}

private static NodeRelationshipCache newCache()
{
int denseNodeThreshold = Integer.parseInt( GraphDatabaseSettings.dense_node_threshold.getDefaultValue() );
return new NodeRelationshipCache( NumberArrayFactory.HEAP, denseNodeThreshold );
}
}

0 comments on commit 1c79e85

Please sign in to comment.