Skip to content

Commit

Permalink
Importer can write batches of records in parallel
Browse files Browse the repository at this point in the history
This means that there may be multiple threads writing records into
the stores as the final step in most stages. This doesn't mean that
there will be multiple threads flushing pages to disk since flushing
is separated from writing records.

There have been some amount of changes to which steps does what and
generally more work have been moved into steps that are naturally
parallelizable already. For example property encoder step now also
creates property records instead of the writer. For this purpose
the IdRangeIterator moved from ha component and IdSequence got a new
method nextIdBatch, which some implementations already had.
  • Loading branch information
tinwelint committed Apr 27, 2017
1 parent a845ebd commit b5bfe6e
Show file tree
Hide file tree
Showing 16 changed files with 560 additions and 205 deletions.
Expand Up @@ -88,4 +88,10 @@ public static void assertIdWithinCapacity( long id, long maxId )
throw new IdCapacityExceededException( id, maxId ); throw new IdCapacityExceededException( id, maxId );
} }
} }

public static boolean hasReservedIdInRange( long startIdInclusive, long endIdExclusive )
{
return startIdInclusive <= IdGeneratorImpl.INTEGER_MINUS_ONE &&
endIdExclusive > IdGeneratorImpl.INTEGER_MINUS_ONE;
}
} }
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.kernel.impl.transaction.state; package org.neo4j.kernel.impl.transaction.state;


import java.util.Iterator; import java.util.Iterator;
import java.util.function.Consumer;


import org.neo4j.kernel.impl.store.DynamicRecordAllocator; import org.neo4j.kernel.impl.store.DynamicRecordAllocator;
import org.neo4j.kernel.impl.store.PropertyStore; import org.neo4j.kernel.impl.store.PropertyStore;
Expand Down Expand Up @@ -196,13 +197,21 @@ public PropertyBlock encodeValue( PropertyBlock block, int propertyKey, Object v


public long createPropertyChain( PrimitiveRecord owner, Iterator<PropertyBlock> properties, public long createPropertyChain( PrimitiveRecord owner, Iterator<PropertyBlock> properties,
RecordAccess<Long, PropertyRecord, PrimitiveRecord> propertyRecords ) RecordAccess<Long, PropertyRecord, PrimitiveRecord> propertyRecords )
{
return createPropertyChain( owner, properties, propertyRecords, p -> {} );
}

public long createPropertyChain( PrimitiveRecord owner, Iterator<PropertyBlock> properties,
RecordAccess<Long, PropertyRecord, PrimitiveRecord> propertyRecords,
Consumer<PropertyRecord> createdPropertyRecords )
{ {
if ( properties == null || !properties.hasNext() ) if ( properties == null || !properties.hasNext() )
{ {
return Record.NO_NEXT_PROPERTY.intValue(); return Record.NO_NEXT_PROPERTY.intValue();
} }
PropertyRecord currentRecord = propertyRecords.create( propertyRecordIdGenerator.nextId(), owner ) PropertyRecord currentRecord = propertyRecords.create( propertyRecordIdGenerator.nextId(), owner )
.forChangingData(); .forChangingData();
createdPropertyRecords.accept( currentRecord );
currentRecord.setInUse( true ); currentRecord.setInUse( true );
currentRecord.setCreated(); currentRecord.setCreated();
PropertyRecord firstRecord = currentRecord; PropertyRecord firstRecord = currentRecord;
Expand All @@ -216,6 +225,7 @@ public long createPropertyChain( PrimitiveRecord owner, Iterator<PropertyBlock>
// Create new record // Create new record
long propertyId = propertyRecordIdGenerator.nextId(); long propertyId = propertyRecordIdGenerator.nextId();
currentRecord = propertyRecords.create( propertyId, owner ).forChangingData(); currentRecord = propertyRecords.create( propertyId, owner ).forChangingData();
createdPropertyRecords.accept( currentRecord );
currentRecord.setInUse( true ); currentRecord.setInUse( true );
currentRecord.setCreated(); currentRecord.setCreated();
// Set up links // Set up links
Expand Down
Expand Up @@ -20,7 +20,7 @@
package org.neo4j.unsafe.impl.batchimport; package org.neo4j.unsafe.impl.batchimport;


import org.neo4j.kernel.impl.store.record.PrimitiveRecord; import org.neo4j.kernel.impl.store.record.PrimitiveRecord;
import org.neo4j.kernel.impl.store.record.PropertyBlock; import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.staging.Step; import org.neo4j.unsafe.impl.batchimport.staging.Step;


Expand All @@ -39,15 +39,12 @@ public class Batch<INPUT,RECORD extends PrimitiveRecord>


public final INPUT[] input; public final INPUT[] input;
public RECORD[] records; public RECORD[] records;
public int[] propertyBlocksLengths;
// This is a special succer. All property blocks for ALL records in this batch sits in this public PropertyRecord[][] propertyRecords;
// single array. The number of property blocks for a given record sits in propertyBlocksLengths public int numberOfProperties;
// using the same index as the record. So it's a collective size suitable for complete looping
// over the batch. // Used by relationship stages to query idMapper and store ids here
public PropertyBlock[] propertyBlocks;
// Used by relationship staged to query idMapper and store ids here
public long[] ids; public long[] ids;
public boolean parallelizableWithPrevious;
public long firstRecordId; public long firstRecordId;
public long[][] labels; public long[][] labels;


Expand Down
Expand Up @@ -19,24 +19,16 @@
*/ */
package org.neo4j.unsafe.impl.batchimport; package org.neo4j.unsafe.impl.batchimport;


import java.util.Iterator;

import org.neo4j.kernel.impl.store.AbstractDynamicStore;
import org.neo4j.kernel.impl.store.CommonAbstractStore; import org.neo4j.kernel.impl.store.CommonAbstractStore;
import org.neo4j.kernel.impl.store.PropertyStore; import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.PropertyType;
import org.neo4j.kernel.impl.store.StoreHeader; import org.neo4j.kernel.impl.store.StoreHeader;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.record.PrimitiveRecord; import org.neo4j.kernel.impl.store.record.PrimitiveRecord;
import org.neo4j.kernel.impl.store.record.PropertyBlock; import org.neo4j.kernel.impl.store.record.PropertyBlock;
import org.neo4j.kernel.impl.store.record.PropertyRecord; import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.transaction.state.PropertyCreator;
import org.neo4j.kernel.impl.util.ReusableIteratorCostume;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl; import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.store.BatchingPropertyRecordAccess;
import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor; import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor;


import static java.lang.Math.max; import static java.lang.Math.max;
Expand All @@ -62,34 +54,25 @@ public interface Monitor
private final CommonAbstractStore<RECORD,? extends StoreHeader> entityStore; private final CommonAbstractStore<RECORD,? extends StoreHeader> entityStore;
private final PropertyStore propertyStore; private final PropertyStore propertyStore;
private final IoMonitor ioMonitor; private final IoMonitor ioMonitor;
private final PropertyCreator propertyCreator;
private final Monitor monitor; private final Monitor monitor;
private long highestId; private final HighestId highestId = new HighestId();

// Reusable instances for less GC
private final BatchingPropertyRecordAccess propertyRecords = new BatchingPropertyRecordAccess();
private final ReusableIteratorCostume<PropertyBlock> blockIterator = new ReusableIteratorCostume<>();


EntityStoreUpdaterStep( StageControl control, Configuration config, EntityStoreUpdaterStep( StageControl control, Configuration config,
CommonAbstractStore<RECORD,? extends StoreHeader> entityStore, CommonAbstractStore<RECORD,? extends StoreHeader> entityStore,
PropertyStore propertyStore, IoMonitor ioMonitor, PropertyStore propertyStore, IoMonitor ioMonitor,
Monitor monitor ) Monitor monitor )
{ {
super( control, "v", config, 1, ioMonitor ); super( control, "v", config, 0, ioMonitor );
this.entityStore = entityStore; this.entityStore = entityStore;
this.propertyStore = propertyStore; this.propertyStore = propertyStore;
this.monitor = monitor; this.monitor = monitor;
this.propertyCreator = new PropertyCreator( propertyStore, null );
this.ioMonitor = ioMonitor; this.ioMonitor = ioMonitor;
this.ioMonitor.reset(); this.ioMonitor.reset();
} }


@Override @Override
protected void process( Batch<INPUT,RECORD> batch, BatchSender sender ) protected void process( Batch<INPUT,RECORD> batch, BatchSender sender )
{ {
// Clear reused data structures
propertyRecords.close();

// Write the entity records, and at the same time allocate property records for its property blocks. // Write the entity records, and at the same time allocate property records for its property blocks.
long highestId = 0; long highestId = 0;
RECORD[] records = batch.records; RECORD[] records = batch.records;
Expand All @@ -98,31 +81,13 @@ protected void process( Batch<INPUT,RECORD> batch, BatchSender sender )
return; return;
} }


int propertyBlockCursor = 0, skipped = 0; int skipped = 0;
for ( int i = 0; i < records.length; i++ ) for ( int i = 0; i < records.length; i++ )
{ {
RECORD record = records[i]; RECORD record = records[i];


int propertyBlockCount = batch.propertyBlocksLengths[i];
if ( record != null ) if ( record != null )
{ {
INPUT input = batch.input[i];
if ( input.hasFirstPropertyId() )
{
record.setNextProp( input.firstPropertyId() );
}
else
{
if ( propertyBlockCount > 0 )
{
reassignDynamicRecordIds( propertyStore, batch.propertyBlocks,
propertyBlockCursor, propertyBlockCount );
long firstProp = propertyCreator.createPropertyChain( record,
blockIterator.dressArray( batch.propertyBlocks, propertyBlockCursor, propertyBlockCount ),
propertyRecords );
record.setNextProp( firstProp );
}
}
highestId = max( highestId, record.getId() ); highestId = max( highestId, record.getId() );
entityStore.prepareForCommit( record ); entityStore.prepareForCommit( record );
entityStore.updateRecord( record ); entityStore.updateRecord( record );
Expand All @@ -132,55 +97,27 @@ protected void process( Batch<INPUT,RECORD> batch, BatchSender sender )
// of number of bad relationships. Just don't import this relationship. // of number of bad relationships. Just don't import this relationship.
skipped++; skipped++;
} }
propertyBlockCursor += propertyBlockCount;
} }
this.highestId = highestId;


// Write all the created property records. this.highestId.offer( highestId );
for ( PropertyRecord propertyRecord : propertyRecords.records() ) writePropertyRecords( batch.propertyRecords, propertyStore );
{
propertyStore.updateRecord( propertyRecord );
}


monitor.entitiesWritten( records[0].getClass(), records.length - skipped ); monitor.entitiesWritten( records[0].getClass(), records.length - skipped );
monitor.propertiesWritten( propertyBlockCursor ); monitor.propertiesWritten( batch.numberOfProperties );
} }


static void reassignDynamicRecordIds( PropertyStore propertyStore, PropertyBlock[] blocks, int offset, int length ) static void writePropertyRecords( PropertyRecord[][] batch, PropertyStore propertyStore )
{ {
// OK, so here we have property blocks, potentially referring to DynamicRecords. The DynamicRecords // Write all the created property records.
// have ids that we need to re-assign in here, because the ids are generated by multiple property encoders, for ( PropertyRecord[] propertyRecords : batch )
// and so we let each one of the encoders generate their own bogus ids and we re-assign those ids here,
// where we know we have a single thread doing this.
for ( int i = 0; i < length; i++ )
{
PropertyBlock block = blocks[offset + i];
PropertyType type = block.getType();
switch ( type )
{
case STRING:
reassignDynamicRecordIds( block, type, propertyStore.getStringStore() );
break;
case ARRAY:
reassignDynamicRecordIds( block, type, propertyStore.getArrayStore() );
break;
default: // No need to do anything be default, we only need to relink for dynamic records
}
}
}

static void reassignDynamicRecordIds( PropertyBlock block, PropertyType type, AbstractDynamicStore store )
{
Iterator<DynamicRecord> dynamicRecords = block.getValueRecords().iterator();
long newId = store.nextId();
block.getValueBlocks()[0] = PropertyStore.singleBlockLongValue( block.getKeyIndexId(), type, newId );
while ( dynamicRecords.hasNext() )
{ {
DynamicRecord dynamicRecord = dynamicRecords.next(); if ( propertyRecords != null )
dynamicRecord.setId( newId );
if ( dynamicRecords.hasNext() )
{ {
dynamicRecord.setNextBlock( newId = store.nextId() ); for ( PropertyRecord propertyRecord : propertyRecords )
{
propertyStore.prepareForCommit( propertyRecord );
propertyStore.updateRecord( propertyRecord );
}
} }
} }
} }
Expand All @@ -194,6 +131,6 @@ protected void done()
// NodeStage completes before CalculateDenseNodesStage then we want to stop the time in the I/O monitor. // NodeStage completes before CalculateDenseNodesStage then we want to stop the time in the I/O monitor.
ioMonitor.stop(); ioMonitor.stop();


entityStore.setHighestPossibleIdInUse( highestId ); entityStore.setHighestPossibleIdInUse( highestId.get() );
} }
} }
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import java.util.concurrent.atomic.AtomicLong;

/**
* Tracks a highest id when there are potentially multiple concurrent threads calling {@link #offer(long)}.
*/
public class HighestId
{
private final AtomicLong highestId = new AtomicLong();

public void offer( long candidate )
{
long currentHighest;
do
{
currentHighest = highestId.get();
if ( candidate < currentHighest )
{
return;
}
}
while ( !highestId.compareAndSet( currentHighest, candidate ) );
}

public long get()
{
return highestId.get();
}
}
Expand Up @@ -80,9 +80,9 @@ public NodeStage( Configuration config, IoMonitor writeMonitor,


nodeStore = neoStore.getNodeStore(); nodeStore = neoStore.getNodeStore();
PropertyStore propertyStore = neoStore.getPropertyStore(); PropertyStore propertyStore = neoStore.getPropertyStore();
add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) );
add( new NodeEncoderStep( control(), config, idMapper, idGenerator, add( new NodeEncoderStep( control(), config, idMapper, idGenerator,
neoStore.getLabelRepository(), nodeStore, memoryUsage ) ); neoStore.getLabelRepository(), nodeStore, memoryUsage ) );
add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) );
add( new LabelScanStorePopulationStep( control(), config, labelScanStore ) ); add( new LabelScanStorePopulationStep( control(), config, labelScanStore ) );
add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore, writeMonitor, add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore, writeMonitor,
storeUpdateMonitor ) ); storeUpdateMonitor ) );
Expand Down

0 comments on commit b5bfe6e

Please sign in to comment.