Skip to content

Commit

Permalink
Small comment fixes in NeoStoreRule and some importer steps
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed May 9, 2017
1 parent 655a84b commit aad15da
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 111 deletions.
Expand Up @@ -47,7 +47,7 @@ public class StoreProcessorTest
public void shouldProcessAllTheRecordsInAStore() throws Exception
{
// given
RecordStore<NodeRecord> nodeStore = stores.open().getNodeStore();
RecordStore<NodeRecord> nodeStore = stores.builder().build().getNodeStore();
ConsistencyReport.Reporter reporter = mock( ConsistencyReport.Reporter.class );
StoreProcessor processor = new StoreProcessor( CheckDecorator.NONE,
reporter, Stage.SEQUENTIAL_FORWARD, CacheAccess.EMPTY );
Expand Down Expand Up @@ -76,7 +76,8 @@ public void shouldStopProcessingRecordsWhenSignalledToStop() throws Exception
ConsistencyReport.Reporter reporter = mock( ConsistencyReport.Reporter.class );
StoreProcessor processor = new StoreProcessor( CheckDecorator.NONE,
reporter, Stage.SEQUENTIAL_FORWARD, CacheAccess.EMPTY );
RecordStore<NodeRecord> nodeStore = new RecordStore.Delegator<NodeRecord>( stores.open().getNodeStore() )
RecordStore<NodeRecord> nodeStore = new RecordStore.Delegator<NodeRecord>(
stores.builder().build().getNodeStore() )
{
@Override
public RecordCursor<NodeRecord> newRecordCursor( NodeRecord record )
Expand Down
Expand Up @@ -34,7 +34,7 @@ public void offer( long candidate )
do
{
currentHighest = highestId.get();
if ( candidate < currentHighest )
if ( candidate <= currentHighest )
{
return;
}
Expand Down
Expand Up @@ -343,7 +343,7 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache,
executeStage( new RelationshipGroupStage( topic, groupConfig,
neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache ) );
// Set node nextRel fields
executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(),
executeStage( new SparseNodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(),
nodeRelationshipCache ) );

// Link relationship chains together for dense nodes
Expand Down
Expand Up @@ -48,8 +48,6 @@
* Encodes property data into {@link PropertyRecord property records}, attaching them to each
* {@link Batch}. This step is designed to handle multiple threads doing the property encoding,
* since property encoding is potentially the most costly step in this {@link Stage}.
* The delivered {@link PropertyRecord property records} all have local ids and so reassignment of those
* ids will have to be done later.
*/
public class PropertyEncoderStep<RECORD extends PrimitiveRecord,INPUT extends InputEntity>
extends ProcessorStep<Batch<INPUT,RECORD>>
Expand Down
Expand Up @@ -20,10 +20,15 @@
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.RelationshipGroupStore;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

/**
* Takes information about relationship groups in the {@link NodeRelationshipCache}, which is produced
* as a side-effect of linking relationships together, and writes them out into {@link RelationshipGroupStore}.
*/
public class RelationshipGroupStage extends Stage
{
public RelationshipGroupStage( String topic, Configuration config,
Expand Down
Expand Up @@ -20,22 +20,18 @@
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;

/**
* Sets the {@link NodeRecord#setNextRel(long) relationship field} on all {@link NodeRecord nodes}.
* This is done after all relationships have been imported and the {@link NodeRelationshipCache node cache}
* points to the first relationship for each node.
*
* This step also creates {@link RelationshipGroupRecord group records} for the dense nodes as it encounters
* dense nodes, where it gets all relationship group information from {@link NodeRelationshipCache}.
* Sets the {@link NodeRecord#setNextRel(long) relationship field} on sparse nodes.
* This is done after all sparse node relationship links have been done and the {@link NodeRelationshipCache node cache}
* points to the first relationship for sparse each node.
*/
public class NodeFirstRelationshipProcessor implements RecordProcessor<NodeRecord>
public class SparseNodeFirstRelationshipProcessor implements RecordProcessor<NodeRecord>
{
private final NodeRelationshipCache cache;

public NodeFirstRelationshipProcessor( NodeRelationshipCache cache )
public SparseNodeFirstRelationshipProcessor( NodeRelationshipCache cache )
{
this.cache = cache;
}
Expand Down
Expand Up @@ -21,7 +21,6 @@

import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeType;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
Expand All @@ -30,27 +29,26 @@
import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;

/**
* Updates {@link NodeRecord node records} with relationship/group chain heads after relationship import. Steps:
* Updates sparse {@link NodeRecord node records} with relationship heads after relationship linking. Steps:
*
* <ol>
* <li>{@link ReadNodeIdsByCacheStep} looks at {@link NodeRelationshipCache} for which nodes have had
* relationships imported and loads those {@link NodeRecord records} from store.</li>
* <li>{@link RecordProcessorStep} / {@link NodeFirstRelationshipProcessor} uses {@link NodeRelationshipCache}
* to update each {@link NodeRecord#setNextRel(long)}. For dense nodes {@link RelationshipGroupRecord group records}
* are created and set as {@link NodeRecord#setNextRel(long)}.</li>
* <li>{@link RecordProcessorStep} / {@link SparseNodeFirstRelationshipProcessor} uses {@link NodeRelationshipCache}
* to update each {@link NodeRecord#setNextRel(long)}.
* <li>{@link UpdateRecordsStep} writes the updated records back into store.</li>
* </ol>
*/
public class NodeFirstRelationshipStage extends Stage
public class SparseNodeFirstRelationshipStage extends Stage
{
public NodeFirstRelationshipStage( String topic, Configuration config, NodeStore nodeStore,
public SparseNodeFirstRelationshipStage( String topic, Configuration config, NodeStore nodeStore,
NodeRelationshipCache cache )
{
super( "Node --> Relationship" + topic, config, ORDER_SEND_DOWNSTREAM );
add( new ReadNodeIdsByCacheStep( control(), config, cache, NodeType.NODE_TYPE_SPARSE ) );
add( new ReadRecordsStep<>( control(), config, true, nodeStore ) );
add( new RecordProcessorStep<>( control(), "LINK", config,
new NodeFirstRelationshipProcessor( cache ), false ) );
new SparseNodeFirstRelationshipProcessor( cache ), false ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore ) );
}
}
Expand Up @@ -245,10 +245,4 @@ public String toString()
return format( "%s[%s, processors:%d, batches:%d", getClass().getSimpleName(),
name, processors( 0 ), doneBatches.get() );
}

@Override
public long doneBatches()
{
return doneBatches.get();
}
}
Expand Up @@ -93,6 +93,4 @@ public interface Step<T> extends Parallelizable, AutoCloseable, Panicable
*/
@Override
void close() throws Exception;

long doneBatches();
}
Expand Up @@ -45,7 +45,7 @@ public class HighIdTransactionApplierTest
public void shouldUpdateHighIdsOnExternalTransaction() throws Exception
{
// GIVEN
NeoStores neoStores = neoStoresRule.open();
NeoStores neoStores = neoStoresRule.builder().build();
HighIdTransactionApplier tracker = new HighIdTransactionApplier( neoStores );

// WHEN
Expand Down Expand Up @@ -108,7 +108,7 @@ public void shouldUpdateHighIdsOnExternalTransaction() throws Exception
public void shouldTrackSecondaryUnitIdsAsWell() throws Exception
{
// GIVEN
NeoStores neoStores = neoStoresRule.open();
NeoStores neoStores = neoStoresRule.builder().build();
HighIdTransactionApplier tracker = new HighIdTransactionApplier( neoStores );

NodeRecord node = new NodeRecord( 5 ).initialize( true, 123, true, 456, 0 );
Expand Down

0 comments on commit aad15da

Please sign in to comment.