Skip to content

Commit

Permalink
Fixes import issue where relationships could get wrong sets of proper…
Browse files Browse the repository at this point in the history
…ties

this would happen on batches where there were skipped relationships. Any
relationships after skipped relationships in that batch would have their
sets of properties shifted the number of steps there were skipped
relationships at that position. So the import would create a consistent
database, just semantically incorrect.
  • Loading branch information
tinwelint committed Mar 27, 2015
1 parent aa648b1 commit e65d2b6
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 52 deletions.
Expand Up @@ -28,8 +28,10 @@
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.neo4j.function.primitive.PrimitiveIntPredicate;
Expand All @@ -39,7 +41,7 @@
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Transaction;
import org.neo4j.helpers.Triplet;
import org.neo4j.helpers.Predicate;
import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.io.fs.FileUtils;
import org.neo4j.kernel.impl.util.Validator;
Expand All @@ -53,6 +55,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -62,10 +65,13 @@

import static org.neo4j.collection.primitive.PrimitiveIntCollections.alwaysTrue;
import static org.neo4j.graphdb.DynamicLabel.label;
import static org.neo4j.graphdb.DynamicRelationshipType.withName;
import static org.neo4j.helpers.ArrayUtil.join;
import static org.neo4j.helpers.Exceptions.contains;
import static org.neo4j.helpers.Exceptions.withMessage;
import static org.neo4j.helpers.collection.Iterables.filter;
import static org.neo4j.helpers.collection.IteratorUtil.count;
import static org.neo4j.helpers.collection.IteratorUtil.singleOrNull;
import static org.neo4j.tooling.ImportTool.MULTI_FILE_DELIMITER;

public class ImportToolTest
Expand Down Expand Up @@ -228,10 +234,10 @@ public void shouldImportGroupsOfOverlappingIds() throws Exception
// GIVEN
List<String> groupOneNodeIds = asList( "1", "2", "3" );
List<String> groupTwoNodeIds = asList( "4", "5", "2" );
List<Triplet<String,String,String>> rels = asList(
Triplet.of( "1", "4", "TYPE" ),
Triplet.of( "2", "5", "TYPE" ),
Triplet.of( "3", "2", "TYPE" ) );
List<RelationshipDataLine> rels = asList(
relationship( "1", "4", "TYPE" ),
relationship( "2", "5", "TYPE" ),
relationship( "3", "2", "TYPE" ) );
Configuration config = Configuration.COMMAS;
String groupOne = "Actor";
String groupTwo = "Movie";
Expand Down Expand Up @@ -342,13 +348,13 @@ public void shouldLogRelationshipsReferingToMissingNode() throws Exception
List<String> nodeIds = asList( "a", "b", "c" );
Configuration config = Configuration.COMMAS;
File nodeData = nodeData( true, config, nodeIds, alwaysTrue() );
List<Triplet<String,String,String>> relationships = Arrays.asList(
List<RelationshipDataLine> relationships = Arrays.asList(
// header line 1 of file1
Triplet.of( "a", "b", "TYPE" ), // line 2 of file1
Triplet.of( "c", "bogus", "TYPE" ), // line 3 of file1
Triplet.of( "b", "c", "KNOWS" ), // line 1 of file2
Triplet.of( "c", "a", "KNOWS" ), // line 2 of file2
Triplet.of( "missing", "a", "KNOWS" ) ); // line 3 of file2
relationship( "a", "b", "TYPE", "aa" ), // line 2 of file1
relationship( "c", "bogus", "TYPE", "bb" ), // line 3 of file1
relationship( "b", "c", "KNOWS", "cc" ), // line 1 of file2
relationship( "c", "a", "KNOWS", "dd" ), // line 2 of file2
relationship( "missing", "a", "KNOWS", "ee" ) ); // line 3 of file2
File relationshipData1 = relationshipData( true, config, relationships.iterator(), lines( 0, 2 ), true );
File relationshipData2 = relationshipData( false, config, relationships.iterator(), lines( 2, 5 ), true );
File bad = file( "bad.log" );
Expand All @@ -368,6 +374,7 @@ public void shouldLogRelationshipsReferingToMissingNode() throws Exception
badContents.contains( relationshipData1.getAbsolutePath() + ":3" ) );
assertTrue( "Didn't contain second bad relationship",
badContents.contains( relationshipData2.getAbsolutePath() + ":3" ) );
verifyRelationships( relationships );
}

@Test
Expand All @@ -377,13 +384,13 @@ public void shouldFailIfTooManyBadRelationships() throws Exception
List<String> nodeIds = asList( "a", "b", "c" );
Configuration config = Configuration.COMMAS;
File nodeData = nodeData( true, config, nodeIds, alwaysTrue() );
List<Triplet<String,String,String>> relationships = Arrays.asList(
List<RelationshipDataLine> relationships = Arrays.asList(
// header line 1 of file1
Triplet.of( "a", "b", "TYPE" ), // line 2 of file1
Triplet.of( "c", "bogus", "TYPE" ), // line 3 of file1
Triplet.of( "b", "c", "KNOWS" ), // line 1 of file2
Triplet.of( "c", "a", "KNOWS" ), // line 2 of file2
Triplet.of( "missing", "a", "KNOWS" ) ); // line 3 of file2
relationship( "a", "b", "TYPE" ), // line 2 of file1
relationship( "c", "bogus", "TYPE" ), // line 3 of file1
relationship( "b", "c", "KNOWS" ), // line 1 of file2
relationship( "c", "a", "KNOWS" ), // line 2 of file2
relationship( "missing", "a", "KNOWS" ) ); // line 3 of file2
File relationshipData1 = relationshipData( true, config, relationships.iterator(), lines( 0, 2 ), true );
File relationshipData2 = relationshipData( false, config, relationships.iterator(), lines( 2, 5 ), true );
File bad = file( "bad.log" );
Expand Down Expand Up @@ -474,6 +481,58 @@ private void verifyData(
}
}

private void verifyRelationships( List<RelationshipDataLine> relationships )
{
GraphDatabaseService db = dbRule.getGraphDatabaseService();
Map<String,Node> nodesById = allNodesById( db );
try ( Transaction tx = db.beginTx() )
{
for ( RelationshipDataLine relationship : relationships )
{
Node startNode = nodesById.get( relationship.startNodeId );
Node endNode = nodesById.get( relationship.endNodeId );
if ( startNode == null || endNode == null )
{
// OK this is a relationship refering to a missing node, skip it
continue;
}
assertNotNull( relationship.toString(), findRelationship( startNode, endNode, relationship ) );
}
tx.success();
}
}

private Relationship findRelationship( Node startNode, final Node endNode, final RelationshipDataLine relationship )
{
return singleOrNull( filter( new Predicate<Relationship>()
{
@Override
public boolean accept( Relationship item )
{
return item.getEndNode().equals( endNode ) && item.getProperty( "name" ).equals( relationship.name );
}
}, startNode.getRelationships( withName( relationship.type ) ).iterator() ) );
}

private Map<String,Node> allNodesById( GraphDatabaseService db )
{
try ( Transaction tx = db.beginTx() )
{
Map<String,Node> nodes = new HashMap<>();
for ( Node node : GlobalGraphOperations.at( db ).getAllNodes() )
{
nodes.put( idOf( node ), node );
}
tx.success();
return nodes;
}
}

private String idOf( Node node )
{
return (String) node.getProperty( "id" );
}

private List<String> nodeIds()
{
return nodeIds( NODE_COUNT );
Expand Down Expand Up @@ -589,7 +648,7 @@ private File relationshipData( boolean includeHeader, Configuration config, List
}

private File relationshipData( boolean includeHeader, Configuration config,
Iterator<Triplet<String,String,String>> data, PrimitiveIntPredicate linePredicate,
Iterator<RelationshipDataLine> data, PrimitiveIntPredicate linePredicate,
boolean specifyType ) throws FileNotFoundException
{
File file = file( fileName( "relationships.csv" ) );
Expand Down Expand Up @@ -638,11 +697,50 @@ private void writeRelationshipHeader( PrintStream writer, Configuration config,
idEntry( null, Type.START_ID, startIdGroup ) + delimiter +
idEntry( null, Type.END_ID, endIdGroup ) +
(specifyType ? (delimiter + ":" + Type.TYPE) : "") +
delimiter + "created:long" );
delimiter + "created:long" +
delimiter + "name:String" );
}

private static class RelationshipDataLine
{
private final String startNodeId;
private final String endNodeId;
private final String type;
private final String name;

RelationshipDataLine( String startNodeId, String endNodeId, String type, String name )
{
this.startNodeId = startNodeId;
this.endNodeId = endNodeId;
this.type = type;
this.name = name;
}

@Override
public String toString()
{
return "RelationshipDataLine [startNodeId=" + startNodeId + ", endNodeId=" + endNodeId + ", type=" + type
+ ", name=" + name + "]";
}
}

private static RelationshipDataLine relationship( String startNodeId, String endNodeId )
{
return relationship( startNodeId, endNodeId, null );
}

private static RelationshipDataLine relationship( String startNodeId, String endNodeId, String type )
{
return relationship( startNodeId, endNodeId, type, null );
}

private static RelationshipDataLine relationship( String startNodeId, String endNodeId, String type, String name )
{
return new RelationshipDataLine( startNodeId, endNodeId, type, name );
}

private void writeRelationshipData( PrintStream writer, Configuration config,
Iterator<Triplet<String,String,String>> data, PrimitiveIntPredicate linePredicate, boolean specifyType )
Iterator<RelationshipDataLine> data, PrimitiveIntPredicate linePredicate, boolean specifyType )
{
char delimiter = config.delimiter();
for ( int i = 0; i < RELATIONSHIP_COUNT; i++ )
Expand All @@ -651,28 +749,31 @@ private void writeRelationshipData( PrintStream writer, Configuration config,
{
break;
}
Triplet<String,String,String> entry = data.next();
RelationshipDataLine entry = data.next();
if ( linePredicate.accept( i ) )
{
writer.println( entry.first() +
delimiter + entry.second() +
(specifyType ? (delimiter + entry.third()) : "") +
delimiter + currentTimeMillis() );
writer.println( entry.startNodeId +
delimiter + entry.endNodeId +
(specifyType ? (delimiter + entry.type) : "") +
delimiter + currentTimeMillis() +
delimiter + (entry.name != null ? entry.name : "")
);
}
}
}

private Iterator<Triplet<String,String,String>> randomRelationships( final List<String> nodeIds )
private Iterator<RelationshipDataLine> randomRelationships( final List<String> nodeIds )
{
return new PrefetchingIterator<Triplet<String,String,String>>()
return new PrefetchingIterator<RelationshipDataLine>()
{
@Override
protected Triplet<String,String,String> fetchNextOrNull()
protected RelationshipDataLine fetchNextOrNull()
{
return Triplet.of(
return new RelationshipDataLine(
nodeIds.get( random.nextInt( nodeIds.size() ) ),
nodeIds.get( random.nextInt( nodeIds.size() ) ),
randomType() );
randomType(),
null );
}
};
}
Expand Down
Expand Up @@ -87,32 +87,33 @@ protected Object process( long ticket, Batch<INPUT,RECORD> batch )
for ( int i = 0; i < records.length; i++ )
{
RECORD record = records[i];
if ( record == null )
{ // Here we have a relationship that refers to missing nodes. It's within the tolerance levels
// of number of bad relationships. Just don't import this relationship.
continue;
}

INPUT input = batch.input[i];
if ( input.hasFirstPropertyId() )
{
record.setNextProp( input.firstPropertyId() );
}
else
int propertyBlockCount = batch.propertyBlocksLengths[i];
if ( record != null )
{
int propertyBlockCount = batch.propertyBlocksLengths[i];
if ( propertyBlockCount > 0 )
INPUT input = batch.input[i];
if ( input.hasFirstPropertyId() )
{
reassignDynamicRecordIds( batch.propertyBlocks, propertyBlockCursor, propertyBlockCount );
long firstProp = propertyCreator.createPropertyChain( record,
blockIterator.dressArray( batch.propertyBlocks, propertyBlockCursor, propertyBlockCount ),
propertyRecords );
record.setNextProp( firstProp );
propertyBlockCursor += propertyBlockCount;
record.setNextProp( input.firstPropertyId() );
}
else
{
if ( propertyBlockCount > 0 )
{
reassignDynamicRecordIds( batch.propertyBlocks, propertyBlockCursor, propertyBlockCount );
long firstProp = propertyCreator.createPropertyChain( record,
blockIterator.dressArray( batch.propertyBlocks, propertyBlockCursor, propertyBlockCount ),
propertyRecords );
record.setNextProp( firstProp );
}
}
highestId = max( highestId, record.getId() );
entityStore.updateRecord( record );
}
else
{ // Here we have a relationship that refers to missing nodes. It's within the tolerance levels
// of number of bad relationships. Just don't import this relationship.
}
highestId = max( highestId, record.getId() );
entityStore.updateRecord( record );
propertyBlockCursor += propertyBlockCount;
}
entityStore.setHighestPossibleIdInUse( highestId );

Expand Down

0 comments on commit e65d2b6

Please sign in to comment.