Skip to content

Commit

Permalink
Reinstates support for multiline-fields
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Jan 5, 2018
1 parent dc1bd29 commit b5919b5
Show file tree
Hide file tree
Showing 18 changed files with 731 additions and 236 deletions.
Expand Up @@ -338,6 +338,7 @@ private boolean fillBuffer() throws IOException
dataLength = nextChunk.length(); dataLength = nextChunk.length();
dataCapacity = nextChunk.maxFieldSize(); dataCapacity = nextChunk.maxFieldSize();
bufferPos = nextChunk.startPosition(); bufferPos = nextChunk.startPosition();
bufferStartPos = bufferPos;
bufferEnd = bufferPos + dataLength; bufferEnd = bufferPos + dataLength;
int shift = seekStartPos - nextChunk.backPosition(); int shift = seekStartPos - nextChunk.backPosition();
seekStartPos = nextChunk.backPosition(); seekStartPos = nextChunk.backPosition();
Expand Down
Expand Up @@ -54,6 +54,7 @@ public void close() throws IOException
reader.close(); reader.close();
} }


@Override
public long position() public long position()
{ {
return position; return position;
Expand Down
5 changes: 5 additions & 0 deletions community/csv/src/main/java/org/neo4j/csv/reader/Chunker.java
Expand Up @@ -49,4 +49,9 @@ public interface Chunker extends Closeable
* @throws IOException on I/O error. * @throws IOException on I/O error.
*/ */
boolean nextChunk( Chunk chunk ) throws IOException; boolean nextChunk( Chunk chunk ) throws IOException;

/**
* @return byte position of how much data has been returned from {@link #nextChunk(Chunk)}.
*/
long position();
} }
Expand Up @@ -52,8 +52,7 @@ public interface Extractor<T> extends Cloneable
/** /**
* @return string representation of what type of value of produces. Also used as key in {@link Extractors}. * @return string representation of what type of value of produces. Also used as key in {@link Extractors}.
*/ */
@Override String name();
String toString();


Extractor<T> clone(); Extractor<T> clone();
} }
12 changes: 6 additions & 6 deletions community/csv/src/main/java/org/neo4j/csv/reader/Extractors.java
Expand Up @@ -144,7 +144,7 @@ public Extractors( char arrayDelimiter, boolean emptyStringsAsNull, boolean trim


public void add( Extractor<?> extractor ) public void add( Extractor<?> extractor )
{ {
instances.put( extractor.toString().toUpperCase(), extractor ); instances.put( extractor.name().toUpperCase(), extractor );
} }


public Extractor<?> valueOf( String name ) public Extractor<?> valueOf( String name )
Expand Down Expand Up @@ -244,17 +244,17 @@ public Extractor<double[]> doubleArray()


private abstract static class AbstractExtractor<T> implements Extractor<T> private abstract static class AbstractExtractor<T> implements Extractor<T>
{ {
private final String toString; private final String name;


AbstractExtractor( String toString ) AbstractExtractor( String name )
{ {
this.toString = toString; this.name = name;
} }


@Override @Override
public String toString() public String name()
{ {
return toString; return name;
} }


@Override @Override
Expand Down
Expand Up @@ -90,4 +90,10 @@ public long length()
{ {
return length; return length;
} }

@Override
public String toString()
{
return sourceDescription;
}
} }
Expand Up @@ -42,7 +42,7 @@ public InputEntity( InputEntityVisitor delegate )


public InputEntity() public InputEntity()
{ {
this( new InputEntityVisitor.Adapter() ); this( InputEntityVisitor.NULL );
} }


public boolean hasPropertyId; public boolean hasPropertyId;
Expand Down Expand Up @@ -302,8 +302,90 @@ public int propertyCount()
return properties.size() / 2; return properties.size() / 2;
} }


public Object propertyKey( int i )
{
return properties.get( i * 2 );
}

public Object propertyValue( int i ) public Object propertyValue( int i )
{ {
return properties.get( i * 2 + 1 ); return properties.get( i * 2 + 1 );
} }

public void replayOnto( InputEntityVisitor visitor ) throws IOException
{
// properties
if ( hasPropertyId )
{
visitor.propertyId( propertyId );
}
else if ( !properties.isEmpty() )
{
int propertyCount = propertyCount();
for ( int i = 0; i < propertyCount; i++ )
{
if ( hasIntPropertyKeyIds )
{
visitor.property( (Integer) propertyKey( i ), propertyValue( i ) );
}
else
{
visitor.property( (String) propertyKey( i ), propertyValue( i ) );
}
}
}

// id
if ( hasLongId )
{
visitor.id( longId );
}
else if ( objectId != null )
{
visitor.id( objectId, idGroup );
}

// labels
if ( hasLabelField )
{
visitor.labelField( labelField );
}
else if ( !labels.isEmpty() )
{
visitor.labels( labels.toArray( new String[labels.size()] ) );
}

// start id
if ( hasLongStartId )
{
visitor.startId( longStartId );
}
else if ( objectStartId != null )
{
visitor.startId( objectStartId, startIdGroup );
}

// end id
if ( hasLongEndId )
{
visitor.endId( longEndId );
}
else if ( objectEndId != null )
{
visitor.endId( objectEndId, endIdGroup );
}

// type
if ( hasIntType )
{
visitor.type( intType );
}
else if ( stringType != null )
{
visitor.type( stringType );
}

// all done
visitor.endOfEntity();
}
} }
Expand Up @@ -248,4 +248,8 @@ public void close() throws IOException
actual.close(); actual.close();
} }
} }

InputEntityVisitor NULL = new Adapter()
{ // empty
};
} }
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.unsafe.impl.batchimport.input.InputChunk; import org.neo4j.unsafe.impl.batchimport.input.InputChunk;


/** /**
* Iterates over chunks of CSV input data. * Iterates over one group of input data, e.g. one or more input files. A whole group conforms to the same header.
*/ */
public class CsvGroupInputIterator extends InputIterator.Adapter public class CsvGroupInputIterator extends InputIterator.Adapter
{ {
Expand All @@ -56,13 +56,15 @@ public CsvGroupInputIterator( Iterator<DataFactory> source, Header.Factory heade
} }


@Override @Override
public CsvInputChunk newChunk() public InputChunk newChunk()
{ {
return new CsvInputChunk( idType, config.delimiter(), badCollector, extractors(), return config.multilineFields()
new ChunkImpl( new char[config.bufferSize()] ) ); ? new EagerlyReadInputChunk()
: new CsvInputChunk( idType, config.delimiter(), badCollector, extractors( config ),
new ChunkImpl( new char[config.bufferSize()] ) );
} }


private Extractors extractors() static Extractors extractors( Configuration config )
{ {
return new Extractors( config.arrayDelimiter(), config.emptyQuotedStringsAsNull() ); return new Extractors( config.arrayDelimiter(), config.emptyQuotedStringsAsNull() );
} }
Expand All @@ -80,7 +82,7 @@ public synchronized boolean next( InputChunk chunk ) throws IOException
} }
Data data = source.next().create( config ); Data data = source.next().create( config );
current = new CsvInputIterator( new MultiReadable( data.stream() ), data.decorator(), current = new CsvInputIterator( new MultiReadable( data.stream() ), data.decorator(),
headerFactory, idType, config, groups ); headerFactory, idType, config, groups, badCollector, extractors( config ) );
} }


if ( current.next( chunk ) ) if ( current.next( chunk ) )
Expand Down
Expand Up @@ -47,6 +47,7 @@
import static org.neo4j.unsafe.impl.batchimport.input.Collector.EMPTY; import static org.neo4j.unsafe.impl.batchimport.input.Collector.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.input.Inputs.calculatePropertySize; import static org.neo4j.unsafe.impl.batchimport.input.Inputs.calculatePropertySize;
import static org.neo4j.unsafe.impl.batchimport.input.Inputs.knownEstimates; import static org.neo4j.unsafe.impl.batchimport.input.Inputs.knownEstimates;
import static org.neo4j.unsafe.impl.batchimport.input.csv.CsvGroupInputIterator.extractors;
import static org.neo4j.unsafe.impl.batchimport.input.csv.CsvInputIterator.extractHeader; import static org.neo4j.unsafe.impl.batchimport.input.csv.CsvInputIterator.extractHeader;


/** /**
Expand Down Expand Up @@ -228,7 +229,8 @@ private long[] sample( Iterable<DataFactory> dataFactories, Header.Factory heade
// Extract the header from the first file in this group // Extract the header from the first file in this group
header = extractHeader( source, headerFactory, idType, config, groups ); header = extractHeader( source, headerFactory, idType, config, groups );
} }
try ( CsvInputIterator iterator = new CsvInputIterator( source, data.decorator(), header, config ); try ( CsvInputIterator iterator = new CsvInputIterator( source, data.decorator(), header, config,
idType, badCollector, extractors( config ) );
InputEntity entity = new InputEntity() ) InputEntity entity = new InputEntity() )
{ {
int entities = 0; int entities = 0;
Expand Down

0 comments on commit b5919b5

Please sign in to comment.