Skip to content

Commit

Permalink
Improved readability and typo fixes in CsvInput and thereabout
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Sep 12, 2016
1 parent 3cce79b commit dad0218
Show file tree
Hide file tree
Showing 16 changed files with 49 additions and 34 deletions.
Expand Up @@ -58,15 +58,15 @@ public Function<Range,InputNode[]> nodes()
{ {
return batch -> new SimpleDataGeneratorBatch<>( nodeHeader, batch.getStart(), randomSeed + batch.getStart(), return batch -> new SimpleDataGeneratorBatch<>( nodeHeader, batch.getStart(), randomSeed + batch.getStart(),
nodeCount, labels, relationshipTypes, nodeCount, labels, relationshipTypes,
new InputNodeDeserialization( SimpleDataGenerator.this, nodeHeader, groups, idType.idsAreExternal() ), new InputNodeDeserialization( nodeHeader, SimpleDataGenerator.this, groups, idType.idsAreExternal() ),
new InputNode[batch.getSize()] ).get(); new InputNode[batch.getSize()] ).get();
} }


public Function<Range,InputRelationship[]> relationships() public Function<Range,InputRelationship[]> relationships()
{ {
return batch -> new SimpleDataGeneratorBatch<>( relationshipHeader, batch.getStart(), return batch -> new SimpleDataGeneratorBatch<>( relationshipHeader, batch.getStart(),
randomSeed + batch.getStart(), nodeCount, labels, relationshipTypes, randomSeed + batch.getStart(), nodeCount, labels, relationshipTypes,
new InputRelationshipDeserialization( SimpleDataGenerator.this, relationshipHeader, groups ), new InputRelationshipDeserialization( relationshipHeader, SimpleDataGenerator.this, groups ),
new InputRelationship[batch.getSize()] ).get(); new InputRelationship[batch.getSize()] ).get();
} }


Expand Down
Expand Up @@ -55,6 +55,10 @@ public InputEntity( String sourceDescription, long sourceLineNumber, long source
this.firstPropertyId = firstPropertyId; this.firstPropertyId = firstPropertyId;
} }


/**
* @return properties on this entity. Properties sits in one array with alternating keys (even indexes)
* and values (odd indexes).
*/
public Object[] properties() public Object[] properties()
{ {
return properties; return properties;
Expand Down
Expand Up @@ -69,11 +69,11 @@ public char arrayDelimiter()
} }
}; };


class Overriden extends org.neo4j.csv.reader.Configuration.Overridden implements Configuration class Overridden extends org.neo4j.csv.reader.Configuration.Overridden implements Configuration
{ {
private final Configuration defaults; private final Configuration defaults;


public Overriden( Configuration defaults ) public Overridden( Configuration defaults )
{ {
super( defaults ); super( defaults );
this.defaults = defaults; this.defaults = defaults;
Expand Down
Expand Up @@ -108,12 +108,15 @@ public InputIterable<InputNode> nodes()
@Override @Override
public InputIterator<InputNode> iterator() public InputIterator<InputNode> iterator()
{ {
DeserializerFactory<InputNode> factory = (dataStream, dataHeader, decorator, validator) -> DeserializerFactory<InputNode> factory = (dataHeader, dataStream, decorator, validator) ->
new InputEntityDeserializer<>( dataHeader, dataStream, config.delimiter(), {
new InputNodeDeserialization( dataStream, dataHeader, groups, idType.idsAreExternal() ), InputNodeDeserialization deserialization =
decorator, validator, badCollector ); new InputNodeDeserialization( dataHeader, dataStream, groups, idType.idsAreExternal() );
return new InputEntityDeserializer<>( dataHeader, dataStream, config.delimiter(),
deserialization, decorator, validator, badCollector );
};
return new InputGroupsDeserializer<>( nodeDataFactory.iterator(), nodeHeaderFactory, config, return new InputGroupsDeserializer<>( nodeDataFactory.iterator(), nodeHeaderFactory, config,
idType, maxProcessors, factory,Validators.<InputNode>emptyValidator(), InputNode.class ); idType, maxProcessors, factory, Validators.<InputNode>emptyValidator(), InputNode.class );
} }


@Override @Override
Expand All @@ -132,10 +135,13 @@ public InputIterable<InputRelationship> relationships()
@Override @Override
public InputIterator<InputRelationship> iterator() public InputIterator<InputRelationship> iterator()
{ {
DeserializerFactory<InputRelationship> factory = (dataStream, dataHeader, decorator, validator) -> DeserializerFactory<InputRelationship> factory = (dataHeader, dataStream, decorator, validator) ->
new InputEntityDeserializer<>( dataHeader, dataStream, config.delimiter(), {
new InputRelationshipDeserialization( dataStream, dataHeader, groups ), InputRelationshipDeserialization deserialization =
decorator, validator, badCollector ); new InputRelationshipDeserialization( dataHeader, dataStream, groups );
return new InputEntityDeserializer<>( dataHeader, dataStream, config.delimiter(),
deserialization, decorator, validator, badCollector );
};
return new InputGroupsDeserializer<>( relationshipDataFactory.iterator(), relationshipHeaderFactory, return new InputGroupsDeserializer<>( relationshipDataFactory.iterator(), relationshipHeaderFactory,
config, idType, maxProcessors, factory, new InputRelationshipValidator(), config, idType, maxProcessors, factory, new InputRelationshipValidator(),
InputRelationship.class ); InputRelationship.class );
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.unsafe.impl.batchimport.input.UpdateBehaviour; import org.neo4j.unsafe.impl.batchimport.input.UpdateBehaviour;


import static org.neo4j.csv.reader.CharSeekers.charSeeker; import static org.neo4j.csv.reader.CharSeekers.charSeeker;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_NODE_DECORATOR;


/** /**
* Pulls in properties from an external CSV source and amends them to the "main" input nodes. * Pulls in properties from an external CSV source and amends them to the "main" input nodes.
Expand All @@ -47,12 +48,15 @@
* <Pre> * <Pre>
* Then properties {@code abc@somewhere} and {@code def@somewhere} will be amended to input node {@code 1} * Then properties {@code abc@somewhere} and {@code def@somewhere} will be amended to input node {@code 1}
* and {@code ghi@someplace} to input node {@code 3}. * and {@code ghi@someplace} to input node {@code 3}.
*
* NOTE that order the input data (where we key on ID) is assumed to be the same, there are no checks
* for trying to verify this constraint though.
*/ */
public class ExternalPropertiesDecorator implements Decorator<InputNode> public class ExternalPropertiesDecorator implements Decorator<InputNode>
{ {
private final InputEntityDeserializer<InputNode> deserializer; private final InputEntityDeserializer<InputNode> deserializer;
private final UpdateBehaviour updateBehaviour; private final UpdateBehaviour updateBehaviour;
private volatile InputNode currentExternal; private InputNode currentExternal;


/** /**
* @param headerFactory creates a {@link Header} that will specify which field is the {@link Type#ID id field} * @param headerFactory creates a {@link Header} that will specify which field is the {@link Type#ID id field}
Expand All @@ -65,8 +69,8 @@ public ExternalPropertiesDecorator( DataFactory<InputNode> data, Header.Factory
CharSeeker dataStream = charSeeker( data.create( config ).stream(), config, true ); CharSeeker dataStream = charSeeker( data.create( config ).stream(), config, true );
Header header = headerFactory.create( dataStream, config, idType ); Header header = headerFactory.create( dataStream, config, idType );
this.deserializer = new InputEntityDeserializer<>( header, dataStream, config.delimiter(), this.deserializer = new InputEntityDeserializer<>( header, dataStream, config.delimiter(),
new InputNodeDeserialization( dataStream, header, new Groups(), idType.idsAreExternal() ), new InputNodeDeserialization( header, dataStream, new Groups(), idType.idsAreExternal() ),
value -> value, Validators.<InputNode>emptyValidator(), badCollector ); NO_NODE_DECORATOR, Validators.<InputNode>emptyValidator(), badCollector );
} }


@Override @Override
Expand Down
Expand Up @@ -55,7 +55,7 @@ class InputGroupsDeserializer<ENTITY extends InputEntity>
@FunctionalInterface @FunctionalInterface
public interface DeserializerFactory<ENTITY extends InputEntity> public interface DeserializerFactory<ENTITY extends InputEntity>
{ {
InputEntityDeserializer<ENTITY> create( CharSeeker dataStream, Header dataHeader, InputEntityDeserializer<ENTITY> create( Header dataHeader, CharSeeker dataStream,
Function<ENTITY,ENTITY> decorator, Validator<ENTITY> validator ); Function<ENTITY,ENTITY> decorator, Validator<ENTITY> validator );
} }


Expand Down Expand Up @@ -94,7 +94,7 @@ protected InputIterator<ENTITY> createNestedIterator( DataFactory<ENTITY> dataFa
Header dataHeader = headerFactory.create( dataStream, config, idType ); Header dataHeader = headerFactory.create( dataStream, config, idType );


InputEntityDeserializer<ENTITY> input = InputEntityDeserializer<ENTITY> input =
factory.create( dataStream, dataHeader, data.decorator(), validator ); factory.create( dataHeader, dataStream, data.decorator(), validator );
// It's important that we assign currentInput before calling initialize(), so that if something // It's important that we assign currentInput before calling initialize(), so that if something
// goes wrong in initialize() and our close() is called we close it properly. // goes wrong in initialize() and our close() is called we close it properly.
currentInput = input; currentInput = input;
Expand Down
Expand Up @@ -44,7 +44,7 @@ public class InputNodeDeserialization extends InputEntityDeserialization<InputNo
private String[] labels = new String[10]; private String[] labels = new String[10];
private int labelsCursor; private int labelsCursor;


public InputNodeDeserialization( SourceTraceability source, Header header, Groups groups, boolean idsAreExternal ) public InputNodeDeserialization( Header header, SourceTraceability source, Groups groups, boolean idsAreExternal )
{ {
super( source ); super( source );
this.header = header; this.header = header;
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class InputRelationshipDeserialization extends InputEntityDeserialization
private Object startNode; private Object startNode;
private Object endNode; private Object endNode;


public InputRelationshipDeserialization( SourceTraceability source, Header header, Groups groups ) public InputRelationshipDeserialization( Header header, SourceTraceability source, Groups groups )
{ {
super( source ); super( source );
this.header = header; this.header = header;
Expand Down
Expand Up @@ -99,7 +99,7 @@ public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory header
// to cater for decorators which may be mutable and sensitive to ordering, while still putting // to cater for decorators which may be mutable and sensitive to ordering, while still putting
// the work of decorating and validating on the processing threads as to not affect performance. // the work of decorating and validating on the processing threads as to not affect performance.
InputEntityDeserializer<ENTITY> chunkDeserializer = InputEntityDeserializer<ENTITY> chunkDeserializer =
factory.create( seeker, header, batchDecorator, batchValidator ); factory.create( header, seeker, batchDecorator, batchValidator );
chunkDeserializer.initialize(); chunkDeserializer.initialize();
List<ENTITY> entities = new ArrayList<>(); List<ENTITY> entities = new ArrayList<>();
while ( chunkDeserializer.hasNext() ) while ( chunkDeserializer.hasNext() )
Expand Down
Expand Up @@ -116,7 +116,7 @@ IdType.STRING, lowBufferSize( COMMAS ), silentBadCollector( 0 ),
private org.neo4j.unsafe.impl.batchimport.input.csv.Configuration lowBufferSize( private org.neo4j.unsafe.impl.batchimport.input.csv.Configuration lowBufferSize(
org.neo4j.unsafe.impl.batchimport.input.csv.Configuration actual ) org.neo4j.unsafe.impl.batchimport.input.csv.Configuration actual )
{ {
return new org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.Overriden( actual ) return new org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.Overridden( actual )
{ {
@Override @Override
public int bufferSize() public int bufferSize()
Expand Down
Expand Up @@ -795,7 +795,7 @@ public void shouldTreatEmptyQuotedStringsAsNullIfConfiguredTo() throws Exception
Iterable<DataFactory<InputNode>> data = DataFactories.nodeData( CsvInputTest.<InputNode>data( Iterable<DataFactory<InputNode>> data = DataFactories.nodeData( CsvInputTest.<InputNode>data(
":ID,one,two,three\n" + ":ID,one,two,three\n" +
"1,\"\",,value" ) ); "1,\"\",,value" ) );
Configuration config = config( new Configuration.Overriden( COMMAS ) Configuration config = config( new Configuration.Overridden( COMMAS )
{ {
@Override @Override
public boolean emptyQuotedStringsAsNull() public boolean emptyQuotedStringsAsNull()
Expand Down Expand Up @@ -994,7 +994,7 @@ public InputNode apply( InputNode from ) throws RuntimeException


private Configuration config( Configuration config ) private Configuration config( Configuration config )
{ {
return new Configuration.Overriden( config ) return new Configuration.Overridden( config )
{ {
@Override @Override
public boolean multilineFields() public boolean multilineFields()
Expand Down
Expand Up @@ -283,7 +283,7 @@ private CharSeeker seeker( String data )


private static Configuration withBufferSize( Configuration config, final int bufferSize ) private static Configuration withBufferSize( Configuration config, final int bufferSize )
{ {
return new Configuration.Overriden( config ) return new Configuration.Overridden( config )
{ {
@Override @Override
public int bufferSize() public int bufferSize()
Expand Down
Expand Up @@ -53,7 +53,7 @@ public void shouldDecorateExternalPropertiesInParallelProcessingCsvInput() throw
int processors = 5; int processors = 5;
Collector collector = mock( Collector.class ); Collector collector = mock( Collector.class );
int count = 1000; int count = 1000;
Configuration config = new Configuration.Overriden( Configuration.COMMAS ) Configuration config = new Configuration.Overridden( Configuration.COMMAS )
{ {
@Override @Override
public int bufferSize() public int bufferSize()
Expand Down Expand Up @@ -86,6 +86,7 @@ public int bufferSize()
assertHasProperty( node, "extra", node.id() + "-decorated" ); assertHasProperty( node, "extra", node.id() + "-decorated" );
if ( i == 0 ) if ( i == 0 )
{ {
// This code is equal to nodes.setProcessors( processors ) (a method which doesn't exist)
nodes.processors( processors - nodes.processors( 0 ) ); nodes.processors( processors - nodes.processors( 0 ) );
} }
} }
Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.InputNode; import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.input.UpdateBehaviour; import org.neo4j.unsafe.impl.batchimport.input.UpdateBehaviour;
import org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.Overriden; import org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.Overridden;


import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -131,9 +131,9 @@ private Supplier<CharReadable> readable( final String data )
return () -> Readables.wrap( new StringReader( data ) ); return () -> Readables.wrap( new StringReader( data ) );
} }


private Overriden config() private Overridden config()
{ {
return new Configuration.Overriden( Configuration.COMMAS ) return new Configuration.Overridden( Configuration.COMMAS )
{ {
@Override @Override
public int bufferSize() public int bufferSize()
Expand Down
Expand Up @@ -52,7 +52,7 @@ public void shouldBeAbleToAskForSourceInformationEvenBetweenTwoSources() throws
final AtomicReference<InputGroupsDeserializer<InputNode>> deserializerTestHack = new AtomicReference<>( null ); final AtomicReference<InputGroupsDeserializer<InputNode>> deserializerTestHack = new AtomicReference<>( null );
InputGroupsDeserializer<InputNode> deserializer = new InputGroupsDeserializer<>( InputGroupsDeserializer<InputNode> deserializer = new InputGroupsDeserializer<>(
data.iterator(), defaultFormatNodeFileHeader(), lowBufferSize( COMMAS ), INTEGER, data.iterator(), defaultFormatNodeFileHeader(), lowBufferSize( COMMAS ), INTEGER,
Runtime.getRuntime().availableProcessors(), (stream,header,decorator,validator) -> Runtime.getRuntime().availableProcessors(), (header,stream,decorator,validator) ->
{ {
// This is the point where the currentInput field in InputGroupsDeserializer was null // This is the point where the currentInput field in InputGroupsDeserializer was null
// so ensure that's no longer the case, just by poking those source methods right here and now. // so ensure that's no longer the case, just by poking those source methods right here and now.
Expand Down Expand Up @@ -82,7 +82,7 @@ public void shouldBeAbleToAskForSourceInformationEvenBetweenTwoSources() throws


private Configuration lowBufferSize( Configuration conf ) private Configuration lowBufferSize( Configuration conf )
{ {
return new Configuration.Overriden( conf ) return new Configuration.Overridden( conf )
{ {
@Override @Override
public int bufferSize() public int bufferSize()
Expand Down
Expand Up @@ -55,7 +55,7 @@ public void shouldParseDataInParallel() throws Exception
// GIVEN // GIVEN
int entities = 500; int entities = 500;
Data<InputNode> data = testData( entities ); Data<InputNode> data = testData( entities );
Configuration config = new Configuration.Overriden( COMMAS ) Configuration config = new Configuration.Overridden( COMMAS )
{ {
@Override @Override
public int bufferSize() public int bufferSize()
Expand All @@ -68,13 +68,13 @@ public int bufferSize()
Groups groups = new Groups(); Groups groups = new Groups();
Set<Thread> observedProcessingThreads = new CopyOnWriteArraySet<>(); Set<Thread> observedProcessingThreads = new CopyOnWriteArraySet<>();
int threads = 4; int threads = 4;
DeserializerFactory<InputNode> deserializerFactory = (chunk,header,decorator,validator) -> DeserializerFactory<InputNode> deserializerFactory = (header,chunk,decorator,validator) ->
{ {
observedProcessingThreads.add( Thread.currentThread() ); observedProcessingThreads.add( Thread.currentThread() );
// Make sure there will be 4 different processing threads doing this // Make sure there will be 4 different processing threads doing this
while ( observedProcessingThreads.size() < threads ); while ( observedProcessingThreads.size() < threads );
return new InputEntityDeserializer<>( header, chunk, config.delimiter(), return new InputEntityDeserializer<>( header, chunk, config.delimiter(),
new InputNodeDeserialization( chunk, header, groups, idType.idsAreExternal() ), decorator, new InputNodeDeserialization( header, chunk, groups, idType.idsAreExternal() ), decorator,
validator, badCollector ); validator, badCollector );
}; };
try ( ParallelInputEntityDeserializer<InputNode> deserializer = new ParallelInputEntityDeserializer<>( data, try ( ParallelInputEntityDeserializer<InputNode> deserializer = new ParallelInputEntityDeserializer<>( data,
Expand Down

0 comments on commit dad0218

Please sign in to comment.