Skip to content

Commit

Permalink
Importer correctly deletes everything about duplicate nodes
Browse files Browse the repository at this point in the history
Even dynamic label records and property records
  • Loading branch information
tinwelint committed Jan 5, 2018
1 parent caa95c2 commit e39b7d3
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 23 deletions.
Expand Up @@ -157,6 +157,11 @@ public boolean isInUse( PageCursor cursor )
for ( int i = 0; i < blocks; i++ )
{
long block = cursor.getLong();
// Since there's no inUse byte we have to check the special case of first block == 0, which will mean that it's deleted
if ( i == 0 && block == 0 )
{
return false;
}
if ( PropertyType.getPropertyTypeOrNull( block ) != null )
{
return true;
Expand Down
Expand Up @@ -46,24 +46,28 @@ public void deletePropertyChain( PrimitiveRecord primitive,

// TODO forChanging/forReading piggy-backing
PropertyRecord propRecord = propertyChange.forChangingData();
for ( PropertyBlock block : propRecord )
{
for ( DynamicRecord valueRecord : block.getValueRecords() )
{
assert valueRecord.inUse();
valueRecord.setInUse( false );
propRecord.addDeletedRecord( valueRecord );
}
}
deletePropertyRecordIncludingValueRecords( propRecord );
nextProp = propRecord.getNextProp();
propRecord.setInUse( false );
propRecord.setChanged( primitive );
// We do not remove them individually, but all together here
propRecord.clearPropertyBlocks();
}
primitive.setNextProp( Record.NO_NEXT_PROPERTY.intValue() );
}

public static void deletePropertyRecordIncludingValueRecords( PropertyRecord record )
{
for ( PropertyBlock block : record )
{
for ( DynamicRecord valueRecord : block.getValueRecords() )
{
assert valueRecord.inUse();
valueRecord.setInUse( false );
record.addDeletedRecord( valueRecord );
}
}
record.clearPropertyBlocks();
record.setInUse( false );
}

/**
* Removes property with given {@code propertyKey} from property chain owner by the primitive found in
* {@code primitiveProxy} if it exists.
Expand Down
Expand Up @@ -100,6 +100,26 @@ public void propertiesImported( long properties )
this.properties.add( properties );
}

public void propertiesRemoved( long properties )
{
this.properties.add( -properties );
}

public long nodesImported()
{
return this.nodes.sum();
}

public long propertiesImported()
{
return this.properties.sum();
}

public long relationshipsImported()
{
return this.relationships.sum();
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -39,6 +39,7 @@ public DeleteDuplicateNodesStage( Configuration config, PrimitiveLongIterator du
BatchingNeoStores neoStore, DataImporter.Monitor storeMonitor )
{
super( "DEDUP", null, config, 0 );
add( new DeleteDuplicateNodesStep( control(), config, duplicateNodeIds, neoStore.getNodeStore(), storeMonitor ) );
add( new DeleteDuplicateNodesStep( control(), config, duplicateNodeIds, neoStore.getNodeStore(), neoStore.getPropertyStore(),
storeMonitor ) );
}
}
Expand Up @@ -22,42 +22,76 @@
import java.io.IOException;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.RecordCursor;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.unsafe.impl.batchimport.staging.LonelyProcessingStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.neo4j.kernel.impl.store.record.RecordLoad.NORMAL;
import static org.neo4j.kernel.impl.transaction.state.PropertyDeleter.deletePropertyRecordIncludingValueRecords;

public class DeleteDuplicateNodesStep extends LonelyProcessingStep
{
private final NodeStore nodeStore;
private final PropertyStore propertyStore;
private final PrimitiveLongIterator nodeIds;
private final DataImporter.Monitor storeMonitor;

private int nodesRemoved;
private long nodesRemoved;
private long propertiesRemoved;

public DeleteDuplicateNodesStep( StageControl control, Configuration config, PrimitiveLongIterator nodeIds, NodeStore nodeStore,
DataImporter.Monitor storeMonitor )
PropertyStore propertyStore, DataImporter.Monitor storeMonitor )
{
super( control, "DEDUP", config );
this.nodeStore = nodeStore;
this.propertyStore = propertyStore;
this.nodeIds = nodeIds;
this.storeMonitor = storeMonitor;
}

@Override
protected void process() throws IOException
{
NodeRecord record = nodeStore.newRecord();
RecordCursor<NodeRecord> cursor = nodeStore.newRecordCursor( record ).acquire( 0, NORMAL );
while ( nodeIds.hasNext() )
NodeRecord nodeRecord = nodeStore.newRecord();
PropertyRecord propertyRecord = propertyStore.newRecord();
try ( RecordCursor<NodeRecord> cursor = nodeStore.newRecordCursor( nodeRecord ).acquire( 0, NORMAL );
RecordCursor<PropertyRecord> propertyCursor = propertyStore.newRecordCursor( propertyRecord ).acquire( 0, NORMAL ) )
{
long duplicateNodeId = nodeIds.next();
cursor.next( duplicateNodeId );
record.setInUse( false );
nodeStore.updateRecord( record );
nodesRemoved++;
while ( nodeIds.hasNext() )
{
long duplicateNodeId = nodeIds.next();
cursor.next( duplicateNodeId );
assert nodeRecord.inUse() : nodeRecord;
// Ensure heavy so that the dynamic label records gets loaded (and then deleted) too
nodeStore.ensureHeavy( nodeRecord );

// Delete property records
long nextProp = nodeRecord.getNextProp();
while ( !Record.NULL_REFERENCE.is( nextProp ) )
{
propertyCursor.next( nextProp );
assert propertyRecord.inUse() : propertyRecord + " for " + nodeRecord;
propertyStore.ensureHeavy( propertyRecord );
propertiesRemoved += propertyRecord.numberOfProperties();
nextProp = propertyRecord.getNextProp();
deletePropertyRecordIncludingValueRecords( propertyRecord );
propertyStore.updateRecord( propertyRecord );
}

// Delete node (and dynamic label records, if any)
nodeRecord.setInUse( false );
for ( DynamicRecord labelRecord : nodeRecord.getDynamicLabelRecords() )
{
labelRecord.setInUse( false );
}
nodeStore.updateRecord( nodeRecord );
nodesRemoved++;
}
}
}

Expand All @@ -66,5 +100,6 @@ public void close() throws Exception
{
super.close();
storeMonitor.nodesRemoved( nodesRemoved );
storeMonitor.propertiesRemoved( propertiesRemoved );
}
}
Expand Up @@ -95,6 +95,7 @@ private static Config configOf( String... config )
protected void after( boolean successful ) throws Throwable
{
IOUtils.closeAll( neoStores, rulePageCache );
neoStores = null;
if ( ruleFs != null )
{
ruleFs.close();
Expand Down

0 comments on commit e39b7d3

Please sign in to comment.