Skip to content

Commit

Permalink
Lets linking stages also use the prepare id sequence
Browse files Browse the repository at this point in the history
please squasch with previous commit at some point
  • Loading branch information
tinwelint committed Dec 12, 2017
1 parent 3beb9f9 commit 0ce1b40
Show file tree
Hide file tree
Showing 17 changed files with 102 additions and 110 deletions.
Expand Up @@ -19,12 +19,10 @@
*/
package org.neo4j.unsafe.impl.batchimport;

import java.util.function.Function;
import java.util.function.LongFunction;

import org.neo4j.kernel.impl.store.CommonAbstractStore;
import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.StoreHeader;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.record.PrimitiveRecord;
Expand All @@ -34,6 +32,7 @@
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.store.PrepareIdSequence;
import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor;

import static java.lang.Math.max;
Expand All @@ -60,12 +59,12 @@ public interface Monitor
private final PropertyStore propertyStore;
private final IoMonitor ioMonitor;
private final Monitor monitor;
private final Function<RecordStore<RECORD>,LongFunction<IdSequence>> prepareIdSequence;
private final PrepareIdSequence<RECORD> prepareIdSequence;

EntityStoreUpdaterStep( StageControl control, Configuration config,
CommonAbstractStore<RECORD,? extends StoreHeader> entityStore,
PropertyStore propertyStore, IoMonitor ioMonitor,
Monitor monitor, Function<RecordStore<RECORD>,LongFunction<IdSequence>> prepareIdSequence )
Monitor monitor, PrepareIdSequence<RECORD> prepareIdSequence )
{
super( control, "v", config, config.parallelRecordWrites() ? 0 : 1, ioMonitor );
this.entityStore = entityStore;
Expand Down
Expand Up @@ -122,7 +122,6 @@ public class ImportLogic implements Closeable
private InputIterable<InputNode> cachedNodes;
private long peakMemoryUsage;
private long availableMemoryForLinking;
private boolean doubleRelationshipRecordUnits;

/**
* @param storeDir directory which the db will be created in.
Expand Down Expand Up @@ -172,6 +171,10 @@ public void initialize( Input input ) throws IOException
// TODO figure out in a good way if this is the high limit format
recordFormats.relationship().getMaxId() > 2L << 36 &&
inputEstimates.numberOfRelationships() > DOUBLE_RELATIONSHIP_RECORD_UNIT_THRESHOLD;
if ( doubleRelationshipRecordUnits )
{
System.out.println( "Will use double record units for all relationships" );
}

executionMonitor.initialize( dependencies );
}
Expand Down Expand Up @@ -268,7 +271,7 @@ public void importRelationships() throws IOException
configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipStore() );
RelationshipStage unlinkedRelationshipStage =
new RelationshipStage( relationshipConfig, writeMonitor, relationships, idMapper,
badCollector, inputCache, neoStore, storeUpdateMonitor, doubleRelationshipRecordUnits );
badCollector, inputCache, neoStore, storeUpdateMonitor );
neoStore.startFlushingPageCache();
executeStage( unlinkedRelationshipStage );
neoStore.stopFlushingPageCache();
Expand Down Expand Up @@ -361,7 +364,7 @@ public int linkRelationships( int startingFromType )

// LINK Forward
RelationshipLinkforwardStage linkForwardStage = new RelationshipLinkforwardStage( topic, relationshipConfig,
neoStore.getRelationshipStore(), nodeRelationshipCache, readFilter, denseChangeFilter, nodeTypes,
neoStore, nodeRelationshipCache, readFilter, denseChangeFilter, nodeTypes,
new RelationshipLinkingProgress(), memoryUsageStats );
executeStage( linkForwardStage );

Expand All @@ -377,8 +380,9 @@ public int linkRelationships( int startingFromType )

// LINK backward
nodeRelationshipCache.setForwardScan( false, true/*dense*/ );
executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(),
nodeRelationshipCache, readFilter, denseChangeFilter, nodeTypes, new RelationshipLinkingProgress(), memoryUsageStats ) );
executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore,
nodeRelationshipCache, readFilter, denseChangeFilter, nodeTypes,
new RelationshipLinkingProgress(), memoryUsageStats ) );

updatePeakMemoryUsage();

Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.store.StorePrepareIdSequence;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn;

Expand All @@ -44,6 +45,6 @@ public NodeFirstGroupStage( Configuration config, RecordStore<RelationshipGroupR
add( new BatchFeedStep( control(), config, allIn( groupStore, config ), groupStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, true, groupStore, null ) );
add( new NodeSetFirstGroupStep( control(), config, nodeStore, cache ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore, new StorePrepareIdSequence<>() ) );
}
}
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;
import org.neo4j.unsafe.impl.batchimport.store.StorePrepareIdSequence;
import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor;

import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN;
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.store.StorePrepareIdSequence;

/**
* Takes information about relationship groups in the {@link NodeRelationshipCache}, which is produced
Expand All @@ -38,6 +39,6 @@ public RelationshipGroupStage( String topic, Configuration config,
{
super( NAME, topic, config, 0 );
add( new ReadGroupRecordsByCacheStep( control(), config, store, cache ) );
add( new UpdateRecordsStep<>( control(), config, store ) );
add( new UpdateRecordsStep<>( control(), config, store, new StorePrepareIdSequence<>() ) );
}
}
Expand Up @@ -28,6 +28,8 @@
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;
import org.neo4j.unsafe.impl.batchimport.store.PrepareIdSequence;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.backwards;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;
Expand All @@ -52,15 +54,15 @@ public class RelationshipLinkbackStage extends Stage
{
public static final String NAME = "Relationship <-- Relationship";

public RelationshipLinkbackStage( String topic, Configuration config, RelationshipStore store,
public RelationshipLinkbackStage( String topic, Configuration config, BatchingNeoStores stores,
NodeRelationshipCache cache, Predicate<RelationshipRecord> readFilter,
Predicate<RelationshipRecord> changeFilter, int nodeTypes,
StatsProvider... additionalStatsProvider )
Predicate<RelationshipRecord> changeFilter, int nodeTypes, StatsProvider... additionalStatsProvider )
{
super( NAME, topic, config, ORDER_SEND_DOWNSTREAM );
RelationshipStore store = stores.getRelationshipStore();
add( new BatchFeedStep( control(), config, backwards( 0, store.getHighId(), config ), store.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, true, store, readFilter ) );
add( new RelationshipLinkbackStep( control(), config, cache, changeFilter, nodeTypes, additionalStatsProvider ) );
add( new UpdateRecordsStep<>( control(), config, store ) );
add( new UpdateRecordsStep<>( control(), config, store, PrepareIdSequence.of( stores.usesDoubleRelationshipRecordUnits() ) ) );
}
}
Expand Up @@ -28,6 +28,8 @@
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;
import org.neo4j.unsafe.impl.batchimport.store.PrepareIdSequence;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.forwards;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;
Expand All @@ -36,15 +38,15 @@ public class RelationshipLinkforwardStage extends Stage
{
public static final String NAME = "Relationship --> Relationship";

public RelationshipLinkforwardStage( String topic, Configuration config, RelationshipStore store,
public RelationshipLinkforwardStage( String topic, Configuration config, BatchingNeoStores stores,
NodeRelationshipCache cache, Predicate<RelationshipRecord> readFilter,
Predicate<RelationshipRecord> denseChangeFilter, int nodeTypes,
StatsProvider... additionalStatsProvider )
Predicate<RelationshipRecord> denseChangeFilter, int nodeTypes, StatsProvider... additionalStatsProvider )
{
super( NAME, topic, config, ORDER_SEND_DOWNSTREAM );
RelationshipStore store = stores.getRelationshipStore();
add( new BatchFeedStep( control(), config, forwards( 0, store.getHighId(), config ), store.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, true, store, readFilter ) );
add( new RelationshipLinkforwardStep( control(), config, cache, denseChangeFilter, nodeTypes, additionalStatsProvider ) );
add( new UpdateRecordsStep<>( control(), config, store ) );
add( new UpdateRecordsStep<>( control(), config, store, PrepareIdSequence.of( stores.usesDoubleRelationshipRecordUnits() ) ) );
}
}
Expand Up @@ -20,13 +20,8 @@
package org.neo4j.unsafe.impl.batchimport;

import java.io.IOException;
import java.util.function.Function;
import java.util.function.LongFunction;

import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.record.PropertyBlock;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
Expand All @@ -37,6 +32,7 @@
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;
import org.neo4j.unsafe.impl.batchimport.store.PrepareIdSequence;
import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor;

import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN;
Expand Down Expand Up @@ -68,7 +64,7 @@ public class RelationshipStage extends Stage
public RelationshipStage( Configuration config, IoMonitor writeMonitor,
InputIterable<InputRelationship> relationships, IdMapper idMapper,
Collector badCollector, InputCache inputCache,
BatchingNeoStores neoStore, CountingStoreUpdateMonitor storeUpdateMonitor, boolean doubleRelationshipRecordUnits )
BatchingNeoStores neoStore, CountingStoreUpdateMonitor storeUpdateMonitor )
throws IOException
{
super( NAME, null, config, ORDER_SEND_DOWNSTREAM );
Expand All @@ -84,15 +80,10 @@ public RelationshipStage( Configuration config, IoMonitor writeMonitor,
add( typer = new RelationshipTypeCheckerStep( control(), config, neoStore.getRelationshipTypeRepository(), storeUpdateMonitor ) );
add( new RelationshipPreparationStep( control(), config, idMapper ) );
add( new RelationshipRecordPreparationStep( control(), config,
neoStore.getRelationshipTypeRepository(), badCollector, relationshipStore, doubleRelationshipRecordUnits ) );
neoStore.getRelationshipTypeRepository(), badCollector, relationshipStore, neoStore.usesDoubleRelationshipRecordUnits() ) );
add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) );

Function<RecordStore<RelationshipRecord>,LongFunction<IdSequence>> prepareIdSequence = doubleRelationshipRecordUnits
? new SecondaryUnitPrepareIdSequence<>()
: new StorePrepareIdSequence<>();

add( new EntityStoreUpdaterStep<>( control(), config, relationshipStore, propertyStore,
writeMonitor, storeUpdateMonitor, prepareIdSequence ) );
writeMonitor, storeUpdateMonitor, PrepareIdSequence.of( neoStore.usesDoubleRelationshipRecordUnits() ) ) );
}

public DataStatistics getDistribution()
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.unsafe.impl.batchimport.cache.NodeType;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.store.StorePrepareIdSequence;

import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;

Expand All @@ -50,6 +51,6 @@ public SparseNodeFirstRelationshipStage( Configuration config, NodeStore nodeSto
add( new ReadRecordsStep<>( control(), config, true, nodeStore, null ) );
add( new RecordProcessorStep<>( control(), "LINK", config,
new SparseNodeFirstRelationshipProcessor( cache ), false ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore, new StorePrepareIdSequence<>() ) );
}
}
Expand Up @@ -20,8 +20,10 @@
package org.neo4j.unsafe.impl.batchimport;

import java.util.Collection;
import java.util.function.LongFunction;

import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.id.validation.IdValidator;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
Expand All @@ -31,6 +33,7 @@
import org.neo4j.unsafe.impl.batchimport.stats.Keys;
import org.neo4j.unsafe.impl.batchimport.stats.Stat;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
import org.neo4j.unsafe.impl.batchimport.store.PrepareIdSequence;

/**
* Updates a batch of records to a store.
Expand All @@ -41,36 +44,35 @@ public class UpdateRecordsStep<RECORD extends AbstractBaseRecord>
{
protected final RecordStore<RECORD> store;
private final int recordSize;
private final PrepareIdSequence<RECORD> prepareIdSequence;
private long recordsUpdated;

public UpdateRecordsStep( StageControl control, Configuration config, RecordStore<RECORD> store )
public UpdateRecordsStep( StageControl control, Configuration config, RecordStore<RECORD> store,
PrepareIdSequence<RECORD> prepareIdSequence )
{
super( control, "v", config, config.parallelRecordWrites() ? 0 : 1 );
this.store = store;
this.prepareIdSequence = prepareIdSequence;
this.recordSize = store.getRecordSize();
}

@Override
protected void process( RECORD[] batch, BatchSender sender ) throws Throwable
{
LongFunction<IdSequence> idSequence = prepareIdSequence.apply( store );
int recordsUpdatedInThisBatch = 0;
for ( RECORD record : batch )
{
if ( record != null && record.inUse() && !IdValidator.isReservedId( record.getId() ) )
{
update( record );
store.prepareForCommit( record, idSequence.apply( record.getId() ) );
store.updateRecord( record );
recordsUpdatedInThisBatch++;
}
}
recordsUpdated += recordsUpdatedInThisBatch;
}

protected void update( RECORD record ) throws Throwable
{
store.prepareForCommit( record );
store.updateRecord( record );
}

@Override
protected void collectStatsProviders( Collection<StatsProvider> into )
{
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.store.StorePrepareIdSequence;

import static org.neo4j.unsafe.impl.batchimport.RelationshipGroupCache.GROUP_ENTRY_SIZE;

Expand All @@ -47,6 +48,6 @@ public WriteGroupsStage( Configuration config, RelationshipGroupCache cache,
super( NAME, null, config, 0 );
add( new ReadGroupsFromCacheStep( control(), config, cache.iterator(), GROUP_ENTRY_SIZE ) );
add( new EncodeGroupsStep( control(), config, store ) );
add( new UpdateRecordsStep<>( control(), config, store ) );
add( new UpdateRecordsStep<>( control(), config, store, new StorePrepareIdSequence<>() ) );
}
}
Expand Up @@ -173,6 +173,7 @@ public void pruneAndOpenExistingStore( Predicate<StoreType> mainStoresToKeep, Pr
deleteStoreFiles( TEMP_NEOSTORE_NAME, tempStoresToKeep );
deleteStoreFiles( DEFAULT_NAME, mainStoresToKeep );
instantiateStores();
neoStores.makeStoreOk();
}

private void deleteStoreFiles( String storeName, Predicate<StoreType> storesToKeep )
Expand Down
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.store;

import java.util.function.Function;
import java.util.function.LongFunction;

import org.neo4j.kernel.impl.store.CommonAbstractStore;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;

/**
* Exists to allow {@link IdSequence} with specific behaviour relevant to import to be injected into
* {@link CommonAbstractStore#prepareForCommit(AbstractBaseRecord, IdSequence)}.
*/
public interface PrepareIdSequence<RECORD extends AbstractBaseRecord> extends Function<RecordStore<RECORD>,LongFunction<IdSequence>>
{
static <RECORD extends AbstractBaseRecord> PrepareIdSequence<RECORD> of( boolean doubleUnits )
{
return doubleUnits ? new SecondaryUnitPrepareIdSequence<>() : new StorePrepareIdSequence<>();
}
}

0 comments on commit 0ce1b40

Please sign in to comment.