Skip to content

Commit

Permalink
Removed the concept of specific ids
Browse files Browse the repository at this point in the history
because it clashes conceptually with the per-type import.
This means that migration, when store files actually needs to be
migrated will break any legacy relationship indexes. This will have
to be communicated with a warning during migration and in the manual
optimally.
  • Loading branch information
tinwelint committed Apr 25, 2016
1 parent 286a96a commit ef87ac4
Show file tree
Hide file tree
Showing 31 changed files with 162 additions and 363 deletions.
Expand Up @@ -169,7 +169,7 @@ public void shouldImportCsvData() throws Exception
inserter.doImport( Inputs.input(
nodes( nodeRandomSeed, NODE_COUNT, inputIdGenerator, groups ),
relationships( relationshipRandomSeed, RELATIONSHIP_COUNT, inputIdGenerator, groups ),
idMapper, idGenerator, false,
idMapper, idGenerator,
/*insanely high bad tolerance, but it will actually never be that many*/
silentBadCollector( RELATIONSHIP_COUNT ) ) );

Expand Down
Expand Up @@ -105,12 +105,6 @@ public IdGenerator idGenerator()
return idType.idGenerator();
}

@Override
public boolean specificRelationshipIds()
{
return false;
}

@Override
public Collector badCollector()
{
Expand Down
Expand Up @@ -155,6 +155,9 @@ public void migrate( File storeDir, File migrationDir, MigrationProgressMonitor.
RecordFormats newFormat = selectForVersion( versionToMigrateTo );
if ( !oldFormat.equals( newFormat ) )
{
// TODO if this store has relationship indexes then warn user about that they will be incorrect
// after migration, because now we're rewriting the relationship ids.

// Some form of migration is required (a fallback/catch-all option)
migrateWithBatchImporter( storeDir, migrationDir,
lastTxId, lastTxChecksum, lastTxLogPosition.getLogVersion(), lastTxLogPosition.getByteOffset(),
Expand Down Expand Up @@ -357,7 +360,7 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
InputIterable<InputRelationship> relationships =
legacyRelationshipsAsInput( legacyStore, requiresPropertyMigration );
importer.doImport(
Inputs.input( nodes, relationships, IdMappers.actual(), IdGenerators.fromInput(), true,
Inputs.input( nodes, relationships, IdMappers.actual(), IdGenerators.fromInput(),
Collectors.badCollector( badOutput, 0 ) ) );

// During migration the batch importer doesn't necessarily writes all entities, depending on
Expand Down Expand Up @@ -507,7 +510,6 @@ protected InputRelationship inputEntityOf( RelationshipRecord record )
InputEntity.NO_PROPERTIES, record.getNextProp(),
record.getFirstNode(), record.getSecondNode(), null, record.getType() );
propertyDecorator.accept( result, record );
result.setSpecificId( record.getId() );
return result;
}
};
Expand Down
Expand Up @@ -41,17 +41,7 @@ public CalculateRelationshipsStep( StageControl control, Configuration config, R
@Override
protected void process( Batch<InputRelationship,RelationshipRecord> batch, BatchSender sender ) throws Throwable
{
int batchSize = batch.input.length;
InputRelationship inputRelationship = batch.input[batchSize - 1];

if ( inputRelationship.hasSpecificId() )
{
maxSpecific = Math.max( inputRelationship.specificId(), maxSpecific );
}
else
{
numberOfRelationships += batchSize;
}
numberOfRelationships += batch.input.length;
sender.send( batch );
}

Expand Down
Expand Up @@ -22,9 +22,12 @@
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIteration.allIn;

/**
* Reads all records from {@link NodeStore} and process the counts in them, populating {@link NodeLabelsCache}
* for later use of {@link RelationshipCountsStage}.
Expand All @@ -35,7 +38,7 @@ public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore n
int highLabelId, CountsAccessor.Updater countsUpdater, StatsProvider... additionalStatsProviders )
{
super( "Node counts", config );
add( new ReadNodeRecordsStep( control(), config, nodeStore ) );
add( new ReadRecordsStep<>( control(), config, nodeStore, allIn( nodeStore ) ) );
add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor(
nodeStore, cache, highLabelId, countsUpdater ), true, additionalStatsProviders ) );
}
Expand Down
Expand Up @@ -21,7 +21,6 @@

import java.io.File;
import java.io.IOException;

import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.Format;
Expand Down Expand Up @@ -174,7 +173,7 @@ public void doImport( Input input ) throws IOException
executeStages( nodeStage, calculateDenseNodesStage );
}

importRelationships( input, nodeRelationshipCache, storeUpdateMonitor, neoStore, badCollector, writeMonitor,
importRelationships( nodeRelationshipCache, storeUpdateMonitor, neoStore, writeMonitor,
idMapper, cachedRelationships,
inputCache, calculateDenseNodesStage.getRelationshipTypes( 100 ) );

Expand Down Expand Up @@ -225,8 +224,8 @@ public void doImport( Input input ) throws IOException
}
}

private void importRelationships( Input input, NodeRelationshipCache nodeRelationshipCache,
CountingStoreUpdateMonitor storeUpdateMonitor, BatchingNeoStores neoStore, Collector badCollector,
private void importRelationships( NodeRelationshipCache nodeRelationshipCache,
CountingStoreUpdateMonitor storeUpdateMonitor, BatchingNeoStores neoStore,
IoMonitor writeMonitor, IdMapper idMapper, InputIterable<InputRelationship> relationships,
InputCache inputCache, Object[] allRelationshipTypes )
{
Expand All @@ -245,6 +244,7 @@ private void importRelationships( Input input, NodeRelationshipCache nodeRelatio
PerTypeRelationshipSplitter perTypeIterator =
new PerTypeRelationshipSplitter( relationships.iterator(), allRelationshipTypes,
type -> neoStore.getRelationshipTypeRepository().getOrCreateId( type ), inputCache );
RelationshipStore relationshipStore = neoStore.getRelationshipStore();

long nextRelationshipId = 0;
for ( int i = 0; perTypeIterator.hasNext(); i++ )
Expand All @@ -253,13 +253,14 @@ private void importRelationships( Input input, NodeRelationshipCache nodeRelatio
nodeRelationshipCache.setForwardScan( true );
Object currentType = perTypeIterator.currentType();
int currentTypeId = neoStore.getRelationshipTypeRepository().getOrCreateId( currentType );

System.out.println( "------------- " + currentType + "(" + currentTypeId + ")" );

InputIterator<InputRelationship> perType = perTypeIterator.next();
String topic = " [:" + currentType + "] (" +
(i+1) + "/" + allRelationshipTypes.length + ")";
final RelationshipStage relationshipStage = new RelationshipStage( topic, config, writeMonitor,
perType, idMapper,
neoStore, nodeRelationshipCache, input.specificRelationshipIds(), storeUpdateMonitor,
nextRelationshipId );
perType, idMapper, neoStore, nodeRelationshipCache, storeUpdateMonitor, nextRelationshipId );
executeStages( relationshipStage );

// Stage 4a -- set node nextRel fields for dense nodes
Expand Down

This file was deleted.

Expand Up @@ -26,6 +26,8 @@
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIteration.allIn;

/**
* Reads from {@link RelationshipStore} and produces batches of startNode,type,endNode values for
* others to process. The result is one long[] with all values in.
Expand All @@ -37,7 +39,7 @@ public class ReadRelationshipCountsDataStep extends ReadRecordsStep<Relationship

public ReadRelationshipCountsDataStep( StageControl control, Configuration config, RelationshipStore store )
{
super( control, config, store );
super( control, config, store, allIn( store ) );
this.highestId = highId - 1;
}

Expand Down

This file was deleted.

@@ -0,0 +1,42 @@
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.collection.primitive.PrimitiveLongCollections;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;

public class RecordIdIteration
{
public static final PrimitiveLongIterator backwards( long highExcluded, long lowIncluded )
{
return new PrimitiveLongCollections.PrimitiveLongBaseIterator()
{
private long next = highExcluded - 1;

@Override
protected boolean fetchNext()
{
return next >= lowIncluded ? next( next-- ) : false;
}
};
}

public static final PrimitiveLongIterator forwards( long lowIncluded, long highExcluded )
{
return new PrimitiveLongCollections.PrimitiveLongBaseIterator()
{
private long nextId = lowIncluded;

@Override
protected boolean fetchNext()
{
return nextId < highExcluded ? next( nextId++ ) : false;
}
};
}

public static PrimitiveLongIterator allIn( RecordStore<? extends AbstractBaseRecord> store )
{
return forwards( store.getNumberOfReservedLowIds(), store.getHighId() );
}
}
Expand Up @@ -45,23 +45,14 @@ public class RelationshipEncoderStep extends ProcessorStep<Batch<InputRelationsh
private final NodeRelationshipCache cache;
private final ParallelizationCoordinator parallelization = new ParallelizationCoordinator();

// There are two "modes" in generating relationship ids
// - ids are decided by InputRelationship#id() (f.ex. store migration, where ids should be kept intact).
// nextRelationshipId will not be used, and all InputRelationships will have to specify ids
// - ids are incremented for each one, starting at a specific id (0 on empty db)
// nextRelationshipId is used and _no_ id from InputRelationship is used, rather no id is allowed to be specified.
private final boolean specificIds;

public RelationshipEncoderStep( StageControl control,
Configuration config,
BatchingTokenRepository<?, ?> relationshipTypeRepository,
NodeRelationshipCache cache,
boolean specificIds )
NodeRelationshipCache cache )
{
super( control, "RELATIONSHIP", config, 0 );
this.relationshipTypeRepository = relationshipTypeRepository;
this.cache = cache;
this.specificIds = specificIds;
}

@Override
Expand All @@ -80,12 +71,7 @@ protected void process( Batch<InputRelationship,RelationshipRecord> batch, Batch
for ( int i = 0; i < input.length; i++ )
{
InputRelationship batchRelationship = input[i];
if ( specificIds != batchRelationship.hasSpecificId() )
{
throw new IllegalStateException( "Input was declared to have specificRelationshipIds=" +
specificIds + ", but " + batchRelationship + " didn't honor that" );
}
long relationshipId = specificIds ? batchRelationship.specificId() : nextRelationshipId++;
long relationshipId = nextRelationshipId++;
// Ids have been verified to exist in CalculateDenseNodeStep
long startNodeId = ids[i*2];
long endNodeId = ids[i*2+1];
Expand Down

0 comments on commit ef87ac4

Please sign in to comment.