Skip to content

Commit

Permalink
Merge pull request #6834 from tinwelint/2.2-forbidden-id-fix
Browse files Browse the repository at this point in the history
Disallows use of reserved id
  • Loading branch information
spacecowboy committed Apr 1, 2016
2 parents e795b4c + d05ff3f commit d9139a1
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 8 deletions.
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class ParallelizeByNodeIdStep extends ProcessorStep<Batch<InputRelationsh
{
private static final int MAX_PARALLELIZABLE_BATCHES = 10;

private final int batchSize;
private final int idBatchSize;

// Since this step is single-threaded and currently RelationshipEncoderStep isn't then this is a perfect
Expand All @@ -71,11 +73,18 @@ public class ParallelizeByNodeIdStep extends ProcessorStep<Batch<InputRelationsh
private int concurrentBatches;

public ParallelizeByNodeIdStep( StageControl control, Configuration config )
{
this( control, config, 0 );
}

public ParallelizeByNodeIdStep( StageControl control, Configuration config, long firstRecordId )
{
super( control, "PARALLELIZE", config, 1 );
// x2 since ids array cover both start and end nodes
this.idBatchSize = config.batchSize()*2;
this.batchSize = config.batchSize();
this.idBatchSize = batchSize*2;
this.concurrentNodeIds = new long[idBatchSize * MAX_PARALLELIZABLE_BATCHES];
this.firstRecordId = firstRecordId;
}

@Override
Expand All @@ -92,6 +101,14 @@ protected void process( Batch<InputRelationship,RelationshipRecord> batch, Batch

// Set state for the next batch
firstRecordId += batch.input.length;
if ( firstRecordId <= IdGeneratorImpl.INTEGER_MINUS_ONE &&
firstRecordId + batchSize >= IdGeneratorImpl.INTEGER_MINUS_ONE )
{
// There's this pesky INTEGER_MINUS_ONE ID again. Easiest is to simply skip this batch of ids
// or at least the part up to that id and just continue after it.
firstRecordId = IdGeneratorImpl.INTEGER_MINUS_ONE + 1;
}

if ( batch.parallelizableWithPrevious )
{
mergeSortedInto( batch.sortedIds, concurrentNodeIds, concurrentNodeIdsRange );
Expand Down
Expand Up @@ -23,22 +23,29 @@

import java.util.Arrays;

import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.Batch;
import org.neo4j.unsafe.impl.batchimport.ParallelizeByNodeIdStep;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

public class ParallelizeByNodeIdStepTest
{
private final StageControl control = mock( StageControl.class );
private final BatchSender sender = mock( BatchSender.class );

@Test
public void shouldDetectABA() throws Throwable
{
// GIVEN
StageControl control = mock( StageControl.class );
ProcessorStep<Batch<InputRelationship,RelationshipRecord>> step = new ParallelizeByNodeIdStep(
control, Configuration.DEFAULT );
int batchSize = Configuration.DEFAULT.batchSize();
Expand All @@ -50,7 +57,6 @@ public void shouldDetectABA() throws Throwable
setIds( aa, 0, 2, batchSize*2 );
Batch<InputRelationship,RelationshipRecord> bb = new Batch<>( new InputRelationship[batchSize] );
setIds( bb, 1, 2, batchSize*2 );
BatchSender sender = mock( BatchSender.class );

// WHEN
step.process( a, sender );
Expand All @@ -66,6 +72,34 @@ public void shouldDetectABA() throws Throwable
assertTrue( bb.parallelizableWithPrevious ); // because no id here collides with aa
}

@Test
public void shouldSkipReservervedId() throws Throwable
{
// GIVEN
ProcessorStep<Batch<InputRelationship,RelationshipRecord>> step = new ParallelizeByNodeIdStep( control,
Configuration.DEFAULT, IdGeneratorImpl.INTEGER_MINUS_ONE - 123_456 );
int batchSize = Configuration.DEFAULT.batchSize();

// WHEN
Batch<InputRelationship,RelationshipRecord> batch = new Batch<>( new InputRelationship[batchSize] );
batch.ids = new long[] {1L};
batch.sortedIds = batch.ids.clone();
while ( batch.firstRecordId < IdGeneratorImpl.INTEGER_MINUS_ONE )
{
step.process( batch, sender );
assertFalse( "Batch got first id " + batch.firstRecordId + " which contains the reserved id",
idWithin( IdGeneratorImpl.INTEGER_MINUS_ONE,
batch.firstRecordId, batch.firstRecordId + batchSize ) );
}

assertTrue( batch.firstRecordId > IdGeneratorImpl.INTEGER_MINUS_ONE );
}

private boolean idWithin( long id, long low, long high )
{
return id >= low && id <= high;
}

private void setIds( Batch<?,?> batch, long first, long stride, int count )
{
batch.ids = new long[count];
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.store.id.IdGenerator;
import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.store.id.IdRange;
import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.logging.Logging;
Expand Down Expand Up @@ -129,7 +130,7 @@ public void switchToSlave()
}
}

private static final long VALUE_REPRESENTING_NULL = -1;
static final long VALUE_REPRESENTING_NULL = -1;

private enum IdGeneratorState
{
Expand Down Expand Up @@ -391,7 +392,7 @@ public String toString()
}
}

private static class IdRangeIterator
static class IdRangeIterator
{
private int position = 0;
private final long[] defrag;
Expand All @@ -413,18 +414,27 @@ long next()
{
return defrag[position];
}
else

long candidate = nextRangeCandidate();
if ( candidate == IdGeneratorImpl.INTEGER_MINUS_ONE )
{
int offset = position - defrag.length;
return (offset < length) ? (start + offset) : VALUE_REPRESENTING_NULL;
position++;
candidate = nextRangeCandidate();
}
return candidate;
}
finally
{
++position;
}
}

private long nextRangeCandidate()
{
int offset = position - defrag.length;
return (offset < length) ? (start + offset) : VALUE_REPRESENTING_NULL;
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -33,12 +33,15 @@
import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.id.HaIdGeneratorFactory.IdRangeIterator;
import org.neo4j.kernel.impl.store.id.IdGenerator;
import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.store.id.IdRange;
import org.neo4j.kernel.logging.DevNullLoggingService;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
Expand All @@ -47,6 +50,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import static org.neo4j.kernel.ha.id.HaIdGeneratorFactory.VALUE_REPRESENTING_NULL;

public class HaIdGeneratorFactoryTest
{
@Test
Expand Down Expand Up @@ -200,6 +205,39 @@ public void shouldTranslateComExceptionsIntoTransientTransactionFailures() throw
generator.nextId();
}

@Test
public void shouldNotUseForbiddenMinusOneIdFromIdBatches() throws Exception
{
// GIVEN
long[] defragIds = {3, 5};
int size = 10;
long low = IdGeneratorImpl.INTEGER_MINUS_ONE - size/2;
IdRange idRange = new IdRange( defragIds, low, size );

// WHEN
IdRangeIterator iterartor = new IdRangeIterator( idRange );

// THEN
for ( long id : defragIds )
{
assertEquals( id, iterartor.next() );
}

int expectedRangeSize = size - 1; // due to the forbidden id
for ( long i = 0, expectedId = low; i < expectedRangeSize; i++, expectedId++ )
{
if ( expectedId == IdGeneratorImpl.INTEGER_MINUS_ONE )
{
expectedId++;
}

long id = iterartor.next();
assertNotEquals( IdGeneratorImpl.INTEGER_MINUS_ONE, id );
assertEquals( expectedId, id );
}
assertEquals( VALUE_REPRESENTING_NULL, iterartor.next() );
}

private Master master;
private DelegateInvocationHandler<Master> masterDelegate;
private EphemeralFileSystemAbstraction fs;
Expand Down

0 comments on commit d9139a1

Please sign in to comment.