Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/3.2' into 3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Aug 17, 2017
2 parents eaa0d47 + 4f4b168 commit ca75fae
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void doImport() throws IOException
idType,
new WrappedCsvInputConfigurationForNeo4jAdmin( csvConfiguration( args, false ) ),
badCollector,
configuration.maxNumberOfProcessors() );
configuration.maxNumberOfProcessors(), !ignoreBadRelationships );

ImportTool.doImport( outsideWorld.errorStream(), outsideWorld.errorStream(), storeDir, logsDir, reportFile, fs,
nodesFiles, relationshipsFiles, false, input, this.databaseConfig, badOutput, configuration );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
input = new CsvInput( nodeData( inputEncoding, nodesFiles ), defaultFormatNodeFileHeader(),
relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(),
idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector,
configuration.maxNumberOfProcessors() );
configuration.maxNumberOfProcessors(), !skipBadRelationships );

doImport( out, err, storeDir, logsDir, badFile, fs, nodesFiles, relationshipsFiles,
enableStacktrace, input, dbConfig, badOutput, configuration );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,7 @@ private void shouldPrintReferenceLinkAsPartOfErrorMessage( List<String> nodeIds,
importTool(
"--into", dbRule.getStoreDirAbsolutePath(),
"--nodes", nodeData( true, config, nodeIds, TRUE ).getAbsolutePath(),
"--skip-bad-relationships", "false",
"--relationships", relationshipData( true, config, relationshipDataLines,
TRUE, true ).getAbsolutePath() );
fail( " Should fail during import." );
Expand Down Expand Up @@ -1831,6 +1832,47 @@ public void shouldRespectMaxMemorySuffixedSetting() throws Exception
"--max-memory", "100M" );
}

@Test
public void shouldTreatRelationshipWithMissingStartOrEndIdOrTypeAsBadRelationship() throws Exception
{
// GIVEN
List<String> nodeIds = asList( "a", "b", "c" );
Configuration config = Configuration.COMMAS;
File nodeData = nodeData( true, config, nodeIds, TRUE );

List<RelationshipDataLine> relationships = Arrays.asList(
relationship( "a", null, "TYPE" ),
relationship( null, "b", "TYPE" ),
relationship( "a", "b", null ) );

File relationshipData = relationshipData( true, config, relationships.iterator(), TRUE, true );
File bad = badFile();

// WHEN importing data where some relationships refer to missing nodes
importTool(
"--into", dbRule.getStoreDirAbsolutePath(),
"--nodes", nodeData.getAbsolutePath(),
"--bad", bad.getAbsolutePath(),
"--skip-bad-relationships", "true",
"--relationships", relationshipData.getAbsolutePath() );

String badContents = FileUtils.readTextFile( bad, Charset.defaultCharset() );
assertEquals( badContents, 3, occurencesOf( badContents, "is missing data" ) );
}

private static int occurencesOf( String text, String lookFor )
{
int index = -1;
int count = -1;
do
{
count++;
index = text.indexOf( lookFor, index + 1 );
}
while ( index != -1 );
return count;
}

private File writeArrayCsv( String[] headers, String[] values ) throws FileNotFoundException
{
File data = file( fileName( "whitespace.csv" ) );
Expand Down Expand Up @@ -2259,16 +2301,21 @@ private void writeRelationshipData( PrintStream writer, Configuration config,
RelationshipDataLine entry = data.next();
if ( linePredicate.test( i ) )
{
writer.println( entry.startNodeId +
delimiter + entry.endNodeId +
(specifyType ? (delimiter + entry.type) : "") +
writer.println( nullSafeString( entry.startNodeId ) +
delimiter + nullSafeString( entry.endNodeId ) +
(specifyType ? (delimiter + nullSafeString( entry.type )) : "") +
delimiter + currentTimeMillis() +
delimiter + (entry.name != null ? entry.name : "")
);
}
}
}

private static String nullSafeString( String endNodeId )
{
return endNodeId != null ? endNodeId : "";
}

private Iterator<RelationshipDataLine> randomRelationships( final List<String> nodeIds )
{
return new PrefetchingIterator<RelationshipDataLine>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@

import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.Group;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper.ID_NOT_FOUND;

/**
* Prepares {@link InputRelationship}, or at least potential slow parts of it, namely {@link IdMapper} lookup.
* This step is also parallelizable so if it becomes a bottleneck then more processors will automatically
Expand All @@ -49,9 +52,15 @@ protected void process( Batch<InputRelationship,RelationshipRecord> batch, Batch
for ( int i = 0; i < input.length; i++ )
{
InputRelationship batchRelationship = input[i];
ids[i * 2] = idMapper.get( batchRelationship.startNode(), batchRelationship.startNodeGroup() );
ids[i * 2 + 1] = idMapper.get( batchRelationship.endNode(), batchRelationship.endNodeGroup() );
boolean hasType = batchRelationship.hasType();
ids[i * 2] = lookup( batchRelationship.startNode(), batchRelationship.startNodeGroup(), hasType );
ids[i * 2 + 1] = lookup( batchRelationship.endNode(), batchRelationship.endNodeGroup(), hasType );
}
sender.send( batch );
}

private long lookup( Object nodeId, Group group, boolean hasType )
{
return nodeId == null || !hasType ? ID_NOT_FOUND : idMapper.get( nodeId, group );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,10 @@ protected void process( Batch<InputRelationship,RelationshipRecord> batch, Batch
InputRelationship batchRelationship = batch.input[i];
long startNodeId = batch.ids[idIndex++];
long endNodeId = batch.ids[idIndex++];
if ( startNodeId == ID_NOT_FOUND || endNodeId == ID_NOT_FOUND )
boolean hasType = batchRelationship.hasType();
if ( startNodeId == ID_NOT_FOUND || endNodeId == ID_NOT_FOUND || !hasType )
{
if ( startNodeId == ID_NOT_FOUND )
{
badCollector.collectBadRelationship( batchRelationship, batchRelationship.startNode() );
}
if ( endNodeId == ID_NOT_FOUND )
{
badCollector.collectBadRelationship( batchRelationship, batchRelationship.endNode() );
}
collectBadRelationship( batchRelationship, startNodeId, endNodeId, hasType );
}
else
{
Expand All @@ -82,10 +76,29 @@ protected void process( Batch<InputRelationship,RelationshipRecord> batch, Batch
relationship.setSecondNode( endNodeId );

int typeId = batchRelationship.hasTypeId() ? batchRelationship.typeId() :
relationshipTypeRepository.getOrCreateId( batchRelationship.type() );
relationshipTypeRepository.getOrCreateId( batchRelationship.type() );
relationship.setType( typeId );
}
}
sender.send( batch );
}

private void collectBadRelationship( InputRelationship batchRelationship, long startNodeId, long endNodeId, boolean hasType )
{
if ( !hasType )
{
badCollector.collectBadRelationship( batchRelationship, null );
}
else
{
if ( startNodeId == ID_NOT_FOUND )
{
badCollector.collectBadRelationship( batchRelationship, batchRelationship.startNode() );
}
if ( endNodeId == ID_NOT_FOUND )
{
badCollector.collectBadRelationship( batchRelationship, batchRelationship.endNode() );
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ protected void process( Batch<InputRelationship,RelationshipRecord> batch, Batch
Map<Object,MutableLong> typeMap = typeCheckers.computeIfAbsent( currentThread(), ( t ) -> new HashMap<>() );
Stream.of( batch.input )
.map( InputRelationship::typeAsObject )
.filter( type -> type != null )
.forEach( type -> typeMap.computeIfAbsent( type, NEW_MUTABLE_LONG ).increment() );
sender.send( batch );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,17 @@ private String getReportMessage()
{
if ( message == null )
{
message = format( "%s referring to missing node %s", relationship, specificValue );
message = !isMissingData( relationship )
? format( "%s referring to missing node %s", relationship, specificValue )
: format( "%s is missing data", relationship );
}
return message;
}

private static boolean isMissingData( InputRelationship relationship )
{
return relationship.startNode() == null || relationship.endNode() == null || !relationship.hasType();
}
}

private static class NodesProblemReporter implements ProblemReporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ public int typeId()
return typeId.intValue();
}

/**
* @return whether or not this relationship has a type assigned to it, whether via {@link #typeId()}
* (where {@link #hasTypeId()} is {@code true}), or via {@link #type()}.
*/
public boolean hasType()
{
return hasTypeId() || type() != null;
}

public void setType( String type )
{
this.type = type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,10 @@
*/
package org.neo4j.unsafe.impl.batchimport.input;

import java.io.File;

import org.neo4j.unsafe.impl.batchimport.InputIterable;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.csv.Configuration;
import org.neo4j.unsafe.impl.batchimport.input.csv.CsvInput;
import org.neo4j.unsafe.impl.batchimport.input.csv.IdType;

import static java.nio.charset.Charset.defaultCharset;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_NODE_DECORATOR;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_RELATIONSHIP_DECORATOR;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.data;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatRelationshipFileHeader;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.nodeData;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.relationshipData;

public class Inputs
{
Expand Down Expand Up @@ -81,14 +67,4 @@ public Collector badCollector()
}
};
}

public static Input csv( File nodes, File relationships, IdType idType,
Configuration configuration, Collector badCollector, int maxProcessors )
{
return new CsvInput(
nodeData( data( NO_NODE_DECORATOR, defaultCharset(), nodes ) ), defaultFormatNodeFileHeader(),
relationshipData( data( NO_RELATIONSHIP_DECORATOR, defaultCharset(), relationships ) ),
defaultFormatRelationshipFileHeader(), idType, configuration,
badCollector, maxProcessors );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.unsafe.impl.batchimport.input.Input;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.input.MissingRelationshipDataException;
import org.neo4j.unsafe.impl.batchimport.input.csv.InputGroupsDeserializer.DeserializerFactory;

import static org.neo4j.unsafe.impl.batchimport.input.csv.DeserializerFactories.defaultNodeDeserializer;
Expand All @@ -54,6 +55,7 @@ public class CsvInput implements Input
private final Groups groups = new Groups();
private final Collector badCollector;
private final int maxProcessors;
private final boolean validateRelationshipData;

/**
* @param nodeDataFactory multiple {@link DataFactory} instances providing data, each {@link DataFactory}
Expand All @@ -68,11 +70,14 @@ public class CsvInput implements Input
* @param config CSV configuration.
* @param badCollector Collector getting calls about bad input data.
* @param maxProcessors maximum number of processors in scenarios where multiple threads may parse CSV data.
* @param validateRelationshipData whether or not to validate relationship data strictly. If {@code true} then
* {@link MissingRelationshipDataException} will be thrown if some mandatory relationship field is missing, such as
* START_ID, END_ID or TYPE, otherwise if {@code false} such relationships will be collected by the {@code badCollector}.
*/
public CsvInput(
Iterable<DataFactory<InputNode>> nodeDataFactory, Header.Factory nodeHeaderFactory,
Iterable<DataFactory<InputRelationship>> relationshipDataFactory, Header.Factory relationshipHeaderFactory,
IdType idType, Configuration config, Collector badCollector, int maxProcessors )
IdType idType, Configuration config, Collector badCollector, int maxProcessors, boolean validateRelationshipData )
{
this.maxProcessors = maxProcessors;
assertSaneConfiguration( config );
Expand All @@ -84,6 +89,7 @@ public CsvInput(
this.idType = idType;
this.config = config;
this.badCollector = badCollector;
this.validateRelationshipData = validateRelationshipData;
}

private void assertSaneConfiguration( Configuration config )
Expand Down Expand Up @@ -136,7 +142,8 @@ public InputIterator<InputRelationship> iterator()
DeserializerFactory<InputRelationship> factory =
defaultRelationshipDeserializer( groups, config, idType, badCollector );
return new InputGroupsDeserializer<>( relationshipDataFactory.iterator(), relationshipHeaderFactory,
config, idType, maxProcessors, 1, factory, new InputRelationshipValidator(),
config, idType, maxProcessors, 1, factory,
validateRelationshipData ? new InputRelationshipValidator() : Validators.emptyValidator(),
InputRelationship.class );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ nodeData, defaultFormatNodeFileHeader(),
IdType.ACTUAL,
csvConfigurationWithLowBufferSize(),
new BadCollector( NullOutputStream.NULL_OUTPUT_STREAM, 0, 0 ),
Runtime.getRuntime().availableProcessors() );
Runtime.getRuntime().availableProcessors(), true );

// WHEN
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,16 @@
import org.neo4j.unsafe.impl.batchimport.BatchImporter;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.Input;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;

import static java.lang.Runtime.getRuntime;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.nio.charset.Charset.defaultCharset;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
Expand All @@ -75,8 +79,14 @@
import static org.neo4j.register.Registers.newDoubleLongRegister;
import static org.neo4j.unsafe.impl.batchimport.input.Collectors.silentBadCollector;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES;
import static org.neo4j.unsafe.impl.batchimport.input.Inputs.csv;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_NODE_DECORATOR;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_RELATIONSHIP_DECORATOR;
import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.data;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatRelationshipFileHeader;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.nodeData;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.relationshipData;
import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors.invisible;

public class CsvInputBatchImportIT
Expand Down Expand Up @@ -124,7 +134,17 @@ IdType.STRING, lowBufferSize( COMMAS ), silentBadCollector( 0 ),
}
}

private org.neo4j.unsafe.impl.batchimport.input.csv.Configuration lowBufferSize(
public static Input csv( File nodes, File relationships, IdType idType,
org.neo4j.unsafe.impl.batchimport.input.csv.Configuration configuration, Collector badCollector, int maxProcessors )
{
return new CsvInput(
nodeData( data( NO_NODE_DECORATOR, defaultCharset(), nodes ) ), defaultFormatNodeFileHeader(),
relationshipData( data( NO_RELATIONSHIP_DECORATOR, defaultCharset(), relationships ) ),
defaultFormatRelationshipFileHeader(), idType, configuration,
badCollector, maxProcessors, true );
}

private static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration lowBufferSize(
org.neo4j.unsafe.impl.batchimport.input.csv.Configuration actual )
{
return new org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.Overridden( actual )
Expand Down

0 comments on commit ca75fae

Please sign in to comment.