Skip to content

Commit

Permalink
Changed CsvGroupInputIterator <--> InputChunk relationship
Browse files Browse the repository at this point in the history
By pushing down implementation details about the different types of
chunks (eager vs lazy) further down.
  • Loading branch information
ragadeeshu authored and tinwelint committed Jan 10, 2018
1 parent 7a78ef4 commit 6933dd3
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 137 deletions.
Expand Up @@ -43,6 +43,7 @@ public class CsvGroupInputIterator extends InputIterator.Adapter
private final Collector badCollector; private final Collector badCollector;
private final Groups groups; private final Groups groups;
private CsvInputIterator current; private CsvInputIterator current;
private int groupId;


public CsvGroupInputIterator( Iterator<DataFactory> source, Header.Factory headerFactory, public CsvGroupInputIterator( Iterator<DataFactory> source, Header.Factory headerFactory,
IdType idType, Configuration config, Collector badCollector, Groups groups ) IdType idType, Configuration config, Collector badCollector, Groups groups )
Expand All @@ -56,12 +57,9 @@ public CsvGroupInputIterator( Iterator<DataFactory> source, Header.Factory heade
} }


@Override @Override
public InputChunk newChunk() public CsvInputChunkProxy newChunk()
{ {
return config.multilineFields() return new CsvInputChunkProxy();
? new EagerlyReadInputChunk()
: new CsvInputChunk( idType, config.delimiter(), badCollector, extractors( config ),
new ChunkImpl( new char[config.bufferSize()] ) );
} }


static Extractors extractors( Configuration config ) static Extractors extractors( Configuration config )
Expand All @@ -82,10 +80,10 @@ public synchronized boolean next( InputChunk chunk ) throws IOException
} }
Data data = source.next().create( config ); Data data = source.next().create( config );
current = new CsvInputIterator( new MultiReadable( data.stream() ), data.decorator(), current = new CsvInputIterator( new MultiReadable( data.stream() ), data.decorator(),
headerFactory, idType, config, groups, badCollector, extractors( config ) ); headerFactory, idType, config, groups, badCollector, extractors( config ), groupId++ );
} }


if ( current.next( chunk ) ) if ( current.next( (CsvInputChunkProxy) chunk ) )
{ {
return true; return true;
} }
Expand Down
Expand Up @@ -36,12 +36,10 @@
import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.Groups; import org.neo4j.unsafe.impl.batchimport.input.Groups;
import org.neo4j.unsafe.impl.batchimport.input.Input; import org.neo4j.unsafe.impl.batchimport.input.Input;
import org.neo4j.unsafe.impl.batchimport.input.InputChunk;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.values.storable.Value; import org.neo4j.values.storable.Value;


import static org.neo4j.csv.reader.CharSeekers.charSeeker; import static org.neo4j.csv.reader.CharSeekers.charSeeker;
import static org.neo4j.helpers.collection.Iterators.iterator;
import static org.neo4j.io.ByteUnit.mebiBytes; import static org.neo4j.io.ByteUnit.mebiBytes;
import static org.neo4j.unsafe.impl.batchimport.InputIterable.replayable; import static org.neo4j.unsafe.impl.batchimport.InputIterable.replayable;
import static org.neo4j.unsafe.impl.batchimport.input.Collector.EMPTY; import static org.neo4j.unsafe.impl.batchimport.input.Collector.EMPTY;
Expand Down Expand Up @@ -211,12 +209,13 @@ private long[] sample( Iterable<DataFactory> dataFactories, Header.Factory heade
ToIntFunction<Value[]> valueSizeCalculator, ToIntFunction<InputEntity> additionalCalculator ) throws IOException ToIntFunction<Value[]> valueSizeCalculator, ToIntFunction<InputEntity> additionalCalculator ) throws IOException
{ {
long[] estimates = new long[4]; // [entity count, property count, property size, labels (for nodes only)] long[] estimates = new long[4]; // [entity count, property count, property size, labels (for nodes only)]
try ( CsvGroupInputIterator group = new CsvGroupInputIterator( iterator(), headerFactory, idType, config, EMPTY, groups ); try ( CsvInputChunkProxy chunk = new CsvInputChunkProxy() )
InputChunk chunk = group.newChunk() )
{ {
// One group of input files // One group of input files
int groupId = 0;
for ( DataFactory dataFactory : dataFactories ) // one input group for ( DataFactory dataFactory : dataFactories ) // one input group
{ {
groupId++;
Header header = null; Header header = null;
Data data = dataFactory.create( config ); Data data = dataFactory.create( config );
RawIterator<CharReadable,IOException> sources = data.stream(); RawIterator<CharReadable,IOException> sources = data.stream();
Expand All @@ -230,7 +229,7 @@ private long[] sample( Iterable<DataFactory> dataFactories, Header.Factory heade
header = extractHeader( source, headerFactory, idType, config, groups ); header = extractHeader( source, headerFactory, idType, config, groups );
} }
try ( CsvInputIterator iterator = new CsvInputIterator( source, data.decorator(), header, config, try ( CsvInputIterator iterator = new CsvInputIterator( source, data.decorator(), header, config,
idType, badCollector, extractors( config ) ); idType, EMPTY, extractors( config ), groupId );
InputEntity entity = new InputEntity() ) InputEntity entity = new InputEntity() )
{ {
int entities = 0; int entities = 0;
Expand Down
Expand Up @@ -20,98 +20,22 @@
package org.neo4j.unsafe.impl.batchimport.input.csv; package org.neo4j.unsafe.impl.batchimport.input.csv;


import java.io.IOException; import java.io.IOException;
import org.neo4j.csv.reader.CharReadableChunker.ChunkImpl;
import org.neo4j.csv.reader.CharSeeker; import org.neo4j.csv.reader.Chunker;
import org.neo4j.csv.reader.Extractors;
import org.neo4j.csv.reader.Source.Chunk;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputChunk; import org.neo4j.unsafe.impl.batchimport.input.InputChunk;
import org.neo4j.unsafe.impl.batchimport.input.InputEntityVisitor;


/** /**
* {@link InputChunk} parsing next entry on each call to {@link #next(InputEntityVisitor)}. * {@link InputChunk} that gets data from {@link Chunker}. Making it explicit in the interface simplifies implementation
* where there are different types of {@link Chunker} for different scenarios.
*/ */
public class CsvInputChunk implements InputChunk public interface CsvInputChunk extends InputChunk
{ {
private final IdType idType;
private final int delimiter;
private final Collector badCollector;
private final Chunk processingChunk;

// Set in #initialize
private CsvInputParser parser;
private Decorator decorator;

// Set as #next is called
private InputEntityVisitor previousVisitor;
private InputEntityVisitor visitor;
private final Extractors extractors;

public CsvInputChunk( IdType idType, int delimiter, Collector badCollector, Extractors extractors,
ChunkImpl processingChunk )
{
this.idType = idType;
this.badCollector = badCollector;
this.extractors = extractors;
this.delimiter = delimiter;
this.processingChunk = processingChunk;
}

/** /**
* Called every time this chunk is updated with new data. Potentially this data is from a different * Fills this {@link InputChunk} from the given {@link Chunker}.
* stream of data than the previous, therefore the header and decorator is also updated. *
* @param seeker {@link CharSeeker} able to seek through the data. * @param chunker to read next chunk from.
* @param header {@link Header} spec to read data according to. * @return {@code true} if there was data read, otherwise {@code false}, meaning end of stream.
* @param decorator additional decoration of the {@link InputEntityVisitor} coming into * @throws IOException on I/O read error.
* {@link #next(InputEntityVisitor)}.
* @throws IOException on I/O error.
*/ */
boolean initialize( CharSeeker seeker, Header header, Decorator decorator ) throws IOException boolean fillFrom( Chunker chunker ) throws IOException;
{
closeCurrentParser();
this.decorator = decorator;
this.visitor = null;
this.parser = new CsvInputParser( seeker, delimiter, idType, header, badCollector, extractors );
if ( header.entries().length == 0 )
{
return false;
}
return true;
}

private void closeCurrentParser() throws IOException
{
if ( parser != null )
{
parser.close();
}
}

@Override
public boolean next( InputEntityVisitor nakedVisitor ) throws IOException
{
if ( visitor == null || nakedVisitor != previousVisitor )
{
decorateVisitor( nakedVisitor );
}

return parser.next( visitor );
}

private void decorateVisitor( InputEntityVisitor nakedVisitor )
{
visitor = decorator.apply( nakedVisitor );
previousVisitor = nakedVisitor;
}

protected Chunk processingChunk()
{
return processingChunk;
}

@Override
public void close() throws IOException
{
closeCurrentParser();
}
} }
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.input.csv;

import java.io.IOException;
import java.util.function.Supplier;

import org.neo4j.csv.reader.Chunker;
import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.input.InputEntityVisitor;

/**
* {@link CsvInputChunk} that adapts new input source groups during the streaming of data.
* {@link InputIterator} is fairly straight-forward, but is made a bit more complicated by the fact that
* there can be multiple different data streams. The outer iterator, {@link CsvGroupInputIterator}, is still responsible
* for handing out chunks, something that generally is good thing since it solves a bunch of other problems.
* The problem it has is that it doesn't know exactly which type of {@link CsvInputChunk} it wants to create,
* because that's up to each input group. This gap is bridged here in this class.
*/
public class CsvInputChunkProxy implements CsvInputChunk
{
private CsvInputChunk actual;
private int groupId = -1;

public void ensureInstantiated( Supplier<CsvInputChunk> newChunk, int groupId ) throws IOException
{
if ( actual == null || groupId != this.groupId )
{
closeCurrent();
actual = newChunk.get();
}
this.groupId = groupId;
}

@Override
public void close() throws IOException
{
closeCurrent();
}

private void closeCurrent() throws IOException
{
if ( actual != null )
{
actual.close();
}
}

@Override
public boolean fillFrom( Chunker chunker ) throws IOException
{
return actual.fillFrom( chunker );
}

@Override
public boolean next( InputEntityVisitor visitor ) throws IOException
{
return actual.next( visitor );
}
}
Expand Up @@ -21,6 +21,7 @@


import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.function.Supplier;


import org.neo4j.csv.reader.BufferedCharSeeker; import org.neo4j.csv.reader.BufferedCharSeeker;
import org.neo4j.csv.reader.CharReadable; import org.neo4j.csv.reader.CharReadable;
Expand All @@ -35,9 +36,8 @@
import org.neo4j.csv.reader.SourceTraceability; import org.neo4j.csv.reader.SourceTraceability;
import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.Groups; import org.neo4j.unsafe.impl.batchimport.input.Groups;
import org.neo4j.unsafe.impl.batchimport.input.InputChunk;

import static java.util.Arrays.copyOf; import static java.util.Arrays.copyOf;
import static org.neo4j.unsafe.impl.batchimport.input.csv.CsvGroupInputIterator.extractors;


/** /**
* Iterates over one stream of data, where all data items conform to the same {@link Header}. * Iterates over one stream of data, where all data items conform to the same {@link Header}.
Expand All @@ -47,26 +47,37 @@ class CsvInputIterator implements SourceTraceability, Closeable
{ {
private final CharReadable stream; private final CharReadable stream;
private final Chunker chunker; private final Chunker chunker;
private final Header header; private final int groupId;
private final Decorator decorator; private final Decorator decorator;
private final Configuration config; private final Supplier<CsvInputChunk> realInputChunkSupplier;


CsvInputIterator( CharReadable stream, Decorator decorator, Header header, Configuration config, IdType idType, Collector badCollector, CsvInputIterator( CharReadable stream, Decorator decorator, Header header, Configuration config, IdType idType, Collector badCollector,
Extractors extractors ) Extractors extractors, int groupId )
{ {
this.stream = stream; this.stream = stream;
this.config = config;
this.decorator = decorator; this.decorator = decorator;
this.header = header; this.groupId = groupId;
this.chunker = config.multilineFields() if ( config.multilineFields() )
? new EagerParserChunker( stream, idType, header, badCollector, extractors, 1_000, config, decorator ) {
: new ClosestNewLineChunker( stream, config.bufferSize() ); // If we're expecting multi-line fields then there's no way to arbitrarily chunk the underlying data source
// and find record delimiters with certainty. This is why we opt for a chunker that does parsing inside
// the call that normally just hands out an arbitrary amount of characters to parse outside and in parallel.
// This chunker is single-threaded, as it was previously too and keeps the functionality of multi-line fields.
this.chunker = new EagerParserChunker( stream, idType, header, badCollector, extractors, 1_000, config, decorator );
this.realInputChunkSupplier = EagerCsvInputChunk::new;
}
else
{
this.chunker = new ClosestNewLineChunker( stream, config.bufferSize() );
this.realInputChunkSupplier = () -> new LazyCsvInputChunk( idType, config.delimiter(), badCollector,
extractors( config ), chunker.newChunk(), config, decorator, header );
}
} }


CsvInputIterator( CharReadable stream, Decorator decorator, Header.Factory headerFactory, IdType idType, CsvInputIterator( CharReadable stream, Decorator decorator, Header.Factory headerFactory, IdType idType, Configuration config, Groups groups,
Configuration config, Groups groups, Collector badCollector, Extractors extractors ) throws IOException Collector badCollector, Extractors extractors, int groupId ) throws IOException
{ {
this( stream, decorator, extractHeader( stream, headerFactory, idType, config, groups ), config, idType, badCollector, extractors ); this( stream, decorator, extractHeader( stream, headerFactory, idType, config, groups ), config, idType, badCollector, extractors, groupId );
} }


static Header extractHeader( CharReadable stream, Header.Factory headerFactory, IdType idType, static Header extractHeader( CharReadable stream, Header.Factory headerFactory, IdType idType,
Expand All @@ -86,27 +97,10 @@ static Header extractHeader( CharReadable stream, Header.Factory headerFactory,
return headerFactory.create( null, null, null, null ); return headerFactory.create( null, null, null, null );
} }


public boolean next( InputChunk chunk ) throws IOException public boolean next( CsvInputChunkProxy proxy ) throws IOException
{
if ( config.multilineFields() )
{
EagerlyReadInputChunk csvChunk = (EagerlyReadInputChunk) chunk;
return chunker.nextChunk( csvChunk );
}

CsvInputChunk csvChunk = (CsvInputChunk) chunk;
Chunk processingChunk = csvChunk.processingChunk();
if ( chunker.nextChunk( processingChunk ) )
{
return initialized( chunk, seeker( processingChunk, config ) );
}
return false;
}

private boolean initialized( InputChunk chunk, CharSeeker seeker ) throws IOException
{ {
CsvInputChunk csvChunk = (CsvInputChunk) chunk; proxy.ensureInstantiated( realInputChunkSupplier, groupId );
return csvChunk.initialize( seeker, header.clone(), decorator ); return proxy.fillFrom( chunker );
} }


@Override @Override
Expand All @@ -128,7 +122,7 @@ public long position()
return chunker.position(); return chunker.position();
} }


private static CharSeeker seeker( Chunk chunk, Configuration config ) static CharSeeker seeker( Chunk chunk, Configuration config )
{ {
return new BufferedCharSeeker( Source.singleChunk( chunk ), config ); return new BufferedCharSeeker( Source.singleChunk( chunk ), config );
} }
Expand Down
Expand Up @@ -21,12 +21,12 @@


import java.io.IOException; import java.io.IOException;


import org.neo4j.csv.reader.Chunker;
import org.neo4j.csv.reader.Source; import org.neo4j.csv.reader.Source;
import org.neo4j.unsafe.impl.batchimport.input.InputChunk;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.InputEntityVisitor; import org.neo4j.unsafe.impl.batchimport.input.InputEntityVisitor;


class EagerlyReadInputChunk implements InputChunk, Source.Chunk class EagerCsvInputChunk implements CsvInputChunk, Source.Chunk
{ {
private InputEntity[] entities; private InputEntity[] entities;
private int cursor; private int cursor;
Expand All @@ -53,6 +53,12 @@ public void close() throws IOException
{ {
} }


@Override
public boolean fillFrom( Chunker chunker ) throws IOException
{
return chunker.nextChunk( this );
}

@Override @Override
public char[] data() public char[] data()
{ {
Expand Down

0 comments on commit 6933dd3

Please sign in to comment.