Skip to content

Commit

Permalink
Simplified PropertyEncoderStep by removing lots of code
Browse files Browse the repository at this point in the history
due to an id issue where more ids than expected were actually used
  • Loading branch information
tinwelint committed Dec 12, 2017
1 parent 0ce1b40 commit bcc3b31
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
* An {@link IdSequence} which does internal batching by using another {@link IdSequence} as source of batches.
* Meant to be used by a single thread at a time.
*/
class RenewableBatchIdSequence implements IdSequence, Resource
public class RenewableBatchIdSequence implements IdSequence, Resource
{
private final IdSequence source;
private final int batchSize;
private final LongConsumer excessIdConsumer;
private IdSequence currentBatch;
private boolean closed;

RenewableBatchIdSequence( IdSequence source, int batchSize, LongConsumer excessIdConsumer )
public RenewableBatchIdSequence( IdSequence source, int batchSize, LongConsumer excessIdConsumer )
{
this.source = source;
this.batchSize = batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,13 @@
*/
package org.neo4j.unsafe.impl.batchimport;

import java.util.Iterator;

import org.neo4j.kernel.impl.store.DynamicRecordAllocator;
import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.PropertyType;
import org.neo4j.kernel.impl.store.id.BatchingIdSequence;
import org.neo4j.kernel.impl.store.id.IdRangeIterator;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.StandardDynamicRecordAllocator;
import org.neo4j.kernel.impl.store.id.RenewableBatchIdSequence;
import org.neo4j.kernel.impl.store.record.PrimitiveRecord;
import org.neo4j.kernel.impl.store.record.PropertyBlock;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.kernel.impl.transaction.state.PropertyCreator;
import org.neo4j.kernel.impl.util.ReusableIteratorCostume;
import org.neo4j.kernel.impl.util.collection.ArrayCollection;
Expand All @@ -42,8 +37,6 @@
import org.neo4j.unsafe.impl.batchimport.store.BatchingPropertyRecordAccess;
import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingPropertyKeyTokenRepository;

import static java.lang.Math.toIntExact;

/**
* Encodes property data into {@link PropertyRecord property records}, attaching them to each
* {@link Batch}. This step is designed to handle multiple threads doing the property encoding,
Expand All @@ -53,8 +46,7 @@ public class PropertyEncoderStep<RECORD extends PrimitiveRecord,INPUT extends In
extends ProcessorStep<Batch<INPUT,RECORD>>
{
private final BatchingPropertyKeyTokenRepository propertyKeyHolder;
private final int arrayDataSize;
private final int stringDataSize;
private final ThreadLocal<IdBatches> ids;
private final PropertyStore propertyStore;

protected PropertyEncoderStep( StageControl control, Configuration config,
Expand All @@ -63,167 +55,73 @@ protected PropertyEncoderStep( StageControl control, Configuration config,
super( control, "PROPERTIES", config, 0 );
this.propertyKeyHolder = propertyKeyHolder;
this.propertyStore = propertyStore;
this.arrayDataSize = propertyStore.getArrayStore().getRecordDataSize();
this.stringDataSize = propertyStore.getStringStore().getRecordDataSize();
this.ids = new ThreadLocal<IdBatches>()
{
@Override
protected IdBatches initialValue()
{
return new IdBatches( propertyStore );
}
};
}

@Override
protected void process( Batch<INPUT,RECORD> batch, BatchSender sender )
{
RelativeIdRecordAllocator stringAllocator = new RelativeIdRecordAllocator( stringDataSize );
RelativeIdRecordAllocator arrayAllocator = new RelativeIdRecordAllocator( arrayDataSize );
IdSequence relativePropertyRecordIds = new BatchingIdSequence();
PropertyCreator propertyCreator = new PropertyCreator( stringAllocator, arrayAllocator,
relativePropertyRecordIds, null, false );
IdBatches threadIds = ids.get();
PropertyCreator propertyCreator = new PropertyCreator( threadIds.stringIds, threadIds.arrayIds, threadIds.propertyIds, null,
propertyStore.allowStorePoints() );
ArrayCollection<PropertyRecord> propertyRecordCollection = new ArrayCollection<>( 4 );
BatchingPropertyRecordAccess propertyRecords = new BatchingPropertyRecordAccess();
ReusableIteratorCostume<PropertyBlock> blockIterator = new ReusableIteratorCostume<>();

batch.propertyRecords = new PropertyRecord[batch.input.length][];
int totalNumberOfProperties = 0;
int totalNumberOfPropertyRecords = 0;
for ( int i = 0; i < batch.input.length; i++ )
{
INPUT input = batch.input[i];
if ( !input.hasFirstPropertyId() )
{ // Encode the properties and attach the blocks to the Batch instance.
// Dynamic records for each entity will start from 0, they will be reassigned later anyway
int count = input.properties().length >> 1;
int count = input.propertyCount();
if ( count > 0 )
{
PropertyBlock[] propertyBlocks = new PropertyBlock[count];
propertyKeyHolder.propertyKeysAndValues( propertyBlocks, 0, input.properties(), propertyCreator );

// Create the property records with local ids, they will have to be reassigned to real ids later
propertyCreator.createPropertyChain( null, // owner assigned in a later step
blockIterator.dressArray( propertyBlocks, 0, count ),
propertyRecords, propertyRecordCollection::add );
batch.propertyRecords[i] = propertyRecordCollection.toArray(
new PropertyRecord[propertyRecordCollection.size()] );
totalNumberOfPropertyRecords += propertyRecordCollection.size();
batch.records[i].setNextProp( batch.propertyRecords[i][0].getId() );
batch.records[i].setIdTo( batch.propertyRecords[i][0] );
totalNumberOfProperties += count;
propertyRecordCollection.clear();
}
}
}

// Enter a synchronized block which assigns id ranges
IdRangeIterator propertyRecordsIdRange;
IdRangeIterator dynamicStringRecordsIdRange;
IdRangeIterator dynamicArrayRecordsIdRange;
synchronized ( propertyStore )
{
propertyRecordsIdRange = idRange( totalNumberOfPropertyRecords, propertyStore );
dynamicStringRecordsIdRange = idRange( toIntExact( stringAllocator.peek() ),
propertyStore.getStringStore() );
dynamicArrayRecordsIdRange = idRange( toIntExact( arrayAllocator.peek() ),
propertyStore.getArrayStore() );
}

// Do reassignment of ids here
for ( int i = 0; i < batch.input.length; i++ )
{
INPUT input = batch.input[i];
RECORD record = batch.records[i];
if ( record != null )
{
reassignPropertyIds( input, record, batch.propertyRecords[i],
propertyRecordsIdRange, dynamicStringRecordsIdRange, dynamicArrayRecordsIdRange );
}
}

// Assigned so that next single-threaded step can assign id ranges quickly
batch.numberOfProperties = totalNumberOfProperties;
sender.send( batch );
}

private static IdRangeIterator idRange( int size, IdSequence idSource )
private static class IdBatches
{
return size > 0 ? idSource.nextIdBatch( size ).iterator() : IdRangeIterator.EMPTY_ID_RANGE_ITERATOR;
}
final RenewableBatchIdSequence propertyIds;
final DynamicRecordAllocator stringIds;
final DynamicRecordAllocator arrayIds;

private static void reassignPropertyIds( InputEntity input, PrimitiveRecord record, PropertyRecord[] propertyRecords,
IdRangeIterator propertyRecordsIdRange,
IdRangeIterator dynamicStringRecordsIdRange,
IdRangeIterator dynamicArrayRecordsIdRange )
{
if ( input.hasFirstPropertyId() )
IdBatches( PropertyStore propertyStore )
{
record.setNextProp( input.firstPropertyId() );
}
else
{
if ( propertyRecords != null )
{
reassignDynamicRecordIds( dynamicStringRecordsIdRange, dynamicArrayRecordsIdRange, propertyRecords );
long firstProp = reassignPropertyRecordIds( record, propertyRecordsIdRange, propertyRecords );
record.setNextProp( firstProp );
}
}
}

private static long reassignPropertyRecordIds( PrimitiveRecord record, IdRangeIterator ids,
PropertyRecord[] propertyRecords )
{
long newId = ids.nextId();
long firstId = newId;
PropertyRecord prev = null;
for ( PropertyRecord propertyRecord : propertyRecords )
{
record.setIdTo( propertyRecord );
propertyRecord.setId( newId );
if ( !Record.NO_NEXT_PROPERTY.is( propertyRecord.getNextProp() ) )
{
propertyRecord.setNextProp( newId = ids.nextId() );
}
if ( prev != null )
{
propertyRecord.setPrevProp( prev.getId() );
}
prev = propertyRecord;
}
return firstId;
}

private static void reassignDynamicRecordIds( IdRangeIterator stringRecordsIds, IdRangeIterator arrayRecordsIds,
PropertyRecord[] propertyRecords )
{
// OK, so here we have property blocks, potentially referring to DynamicRecords. The DynamicRecords
// have ids that we need to re-assign in here, because the ids are generated by multiple property encoders,
// and so we let each one of the encoders generate their own bogus ids and we re-assign those ids here,
// where we know we have a single thread doing this.
for ( PropertyRecord propertyRecord : propertyRecords )
{
for ( PropertyBlock block : propertyRecord )
{
PropertyType type = block.getType();
switch ( type )
{
case STRING:
reassignDynamicRecordIds( block, type, stringRecordsIds );
break;
case ARRAY:
reassignDynamicRecordIds( block, type, arrayRecordsIds );
break;
default: // No need to do anything be default, we only need to relink for dynamic records
}
}
}
}

private static void reassignDynamicRecordIds( PropertyBlock block, PropertyType type, IdRangeIterator ids )
{
Iterator<DynamicRecord> dynamicRecords = block.getValueRecords().iterator();
long newId = ids.nextId();
block.getValueBlocks()[0] = PropertyStore.singleBlockLongValue( block.getKeyIndexId(), type, newId );
while ( dynamicRecords.hasNext() )
{
DynamicRecord dynamicRecord = dynamicRecords.next();
dynamicRecord.setId( newId );
if ( dynamicRecords.hasNext() )
{
dynamicRecord.setNextBlock( newId = ids.nextId() );
}
this.propertyIds = new RenewableBatchIdSequence( propertyStore, propertyStore.getRecordsPerPage(), id -> {} );
this.stringIds = new StandardDynamicRecordAllocator(
new RenewableBatchIdSequence( propertyStore.getStringStore(),
propertyStore.getStringStore().getRecordsPerPage(), id -> {} ),
propertyStore.getStringStore().getRecordDataSize() );
this.arrayIds = new StandardDynamicRecordAllocator(
new RenewableBatchIdSequence( propertyStore.getArrayStore(),
propertyStore.getArrayStore().getRecordsPerPage(), id -> {} ),
propertyStore.getArrayStore().getRecordDataSize() );
}
}
}

This file was deleted.

0 comments on commit bcc3b31

Please sign in to comment.