Skip to content

Commit

Permalink
Importer can read batches of records in parallel
Browse files Browse the repository at this point in the history
This may incur actual parallel reading from underlying storage,
i.e. multiple pages in parallel. Many environments will benefit from
this parallelism.
  • Loading branch information
tinwelint committed Apr 27, 2017
1 parent b5bfe6e commit e303830
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 303 deletions.
Expand Up @@ -21,6 +21,7 @@

import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

Expand All @@ -42,8 +43,8 @@ public CountGroupsStage( Configuration config, RecordStore<RelationshipGroupReco
RelationshipGroupCache groupCache )
{
super( "Count groups", config );

add( new ReadRecordsStep<>( control(), config, store, allIn( store, config ) ) );
add( new BatchFeedStep( control(), config, allIn( store, config ), store.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, store ) );
add( new CountGroupsStep( control(), config, groupCache ) );
}
}
Expand Up @@ -22,6 +22,7 @@
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
Expand All @@ -38,7 +39,8 @@ public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore n
int highLabelId, CountsAccessor.Updater countsUpdater, StatsProvider... additionalStatsProviders )
{
super( "Node counts", config );
add( new ReadRecordsStep<>( control(), config, nodeStore, allIn( nodeStore, config ) ) );
add( new BatchFeedStep( control(), config, allIn( nodeStore, config ), nodeStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, nodeStore ) );
add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor(
nodeStore, cache, highLabelId, countsUpdater ), true, additionalStatsProviders ) );
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.cache.ByteArray;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

Expand All @@ -38,7 +39,8 @@ public NodeFirstGroupStage( Configuration config, RecordStore<RelationshipGroupR
RecordStore<NodeRecord> nodeStore, ByteArray cache )
{
super( "Node --> Group", config );
add( new ReadRecordsStep<>( control(), config, groupStore, allIn( groupStore, config ) ) );
add( new BatchFeedStep( control(), config, allIn( groupStore, config ), groupStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, groupStore ) );
add( new NodeSetFirstGroupStep( control(), config, nodeStore, cache ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore ) );
}
Expand Down
Expand Up @@ -23,17 +23,18 @@
import java.util.concurrent.ConcurrentHashMap;

import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

/**
* Processes relationship count data received from {@link ReadRelationshipCountsDataStep} and keeps
* the accumulated counts per thread. Aggregated when {@link #done()}.
* Processes relationship records, feeding them to {@link RelationshipCountsProcessor} which keeps
* the accumulated counts per thread. Aggregated in {@link #done()}.
*/
public class ProcessRelationshipCountsDataStep extends ProcessorStep<long[]>
public class ProcessRelationshipCountsDataStep extends ProcessorStep<RelationshipRecord[]>
{
private final NodeLabelsCache cache;
private final Map<Thread,RelationshipCountsProcessor> processors = new ConcurrentHashMap<>();
Expand All @@ -55,12 +56,13 @@ public ProcessRelationshipCountsDataStep( StageControl control, NodeLabelsCache
}

@Override
protected void process( long[] batch, BatchSender sender )
protected void process( RelationshipRecord[] batch, BatchSender sender )
{
RelationshipCountsProcessor processor = processor();
for ( int i = 0; i < batch.length; i++ )
{
processor.process( batch[i++], (int)batch[i++], batch[i] );
RelationshipRecord relationship = batch[i];
processor.process( relationship.getFirstNode(), relationship.getType(), relationship.getSecondNode() );
}
}

Expand Down

This file was deleted.

Expand Up @@ -23,20 +23,27 @@
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn;

/**
* Reads all records from {@link RelationshipStore} and process the counts in them. Uses a {@link NodeLabelsCache}
* previously populated by f.ex {@link NodeCountsStage}.
*/
public class RelationshipCountsStage extends Stage
{
public RelationshipCountsStage( Configuration config, NodeLabelsCache cache, RelationshipStore relationshipStore,
public RelationshipCountsStage( Configuration config, NodeLabelsCache cache,
RelationshipStore relationshipStore,
int highLabelId, int highRelationshipTypeId, CountsAccessor.Updater countsUpdater,
NumberArrayFactory cacheFactory )
{
super( "Relationship counts", config );
add( new ReadRelationshipCountsDataStep( control(), config, relationshipStore ) );
add( new BatchFeedStep( control(), config, allIn( relationshipStore, config ),
relationshipStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, relationshipStore ) );
add( new ProcessRelationshipCountsDataStep( control(), cache, config,
highLabelId, highRelationshipTypeId, countsUpdater, cacheFactory ) );
}
Expand Down
Expand Up @@ -22,8 +22,10 @@
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.backwards;

/**
Expand All @@ -48,8 +50,9 @@ public RelationshipLinkbackStage( String topic, Configuration config, Relationsh
NodeRelationshipCache cache, long lowRelationshipId, long highRelationshipId, int nodeTypes )
{
super( "Relationship --> Relationship" + topic, config );
add( new ReadRecordsStep<>( control(), config, store,
backwards( lowRelationshipId, highRelationshipId, config ) ) );
add( new BatchFeedStep( control(), config, backwards( lowRelationshipId, highRelationshipId, config ),
store.getRecordSize()) );
add( new ReadRecordsStep<>( control(), config, store ) );
add( new RelationshipLinkbackStep( control(), config, cache, nodeTypes ) );
add( new UpdateRecordsStep<>( control(), config, store ) );
}
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

Expand All @@ -41,7 +42,8 @@ public ScanAndCacheGroupsStage( Configuration config, RecordStore<RelationshipGr
RelationshipGroupCache cache )
{
super( "Gather", config );
add( new ReadRecordsStep<>( control(), config, store, allInReversed( store, config ) ) );
add( new BatchFeedStep( control(), config, allInReversed( store, config ), store.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, store ) );
add( new CacheGroupsStep( control(), config, cache ) );
}
}
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.staging;

import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.RecordIdIterator;

/**
* Releases batches of record ids to be read, potentially in parallel, by downstream batches.
*/
public class BatchFeedStep extends IoProducerStep
{
private final RecordIdIterator ids;
private final int recordSize;
private volatile long count;

public BatchFeedStep( StageControl control, Configuration config, RecordIdIterator ids, int recordSize )
{
super( control, config );
this.ids = ids;
this.recordSize = recordSize;
}

@Override
protected Object nextBatchOrNull( long ticket, int batchSize )
{
count += batchSize;
return ids.nextBatch();
}

@Override
protected long position()
{
return count * recordSize;
}
}

0 comments on commit e303830

Please sign in to comment.