From 6933dd392179efc1edc6fb8d5ad80b059daed8b4 Mon Sep 17 00:00:00 2001 From: Ragnar Mellbin Date: Wed, 10 Jan 2018 15:41:12 +0100 Subject: [PATCH] Changed CsvGroupInputIterator <--> InputChunk relationship By pushing down implementation details about the different types of chunks (eager vs lazy) further down. --- .../input/csv/CsvGroupInputIterator.java | 12 +- .../impl/batchimport/input/csv/CsvInput.java | 9 +- .../batchimport/input/csv/CsvInputChunk.java | 98 ++------------- .../input/csv/CsvInputChunkProxy.java | 77 ++++++++++++ .../input/csv/CsvInputIterator.java | 62 +++++----- ...nputChunk.java => EagerCsvInputChunk.java} | 10 +- .../input/csv/EagerParserChunker.java | 4 +- .../input/csv/LazyCsvInputChunk.java | 114 ++++++++++++++++++ 8 files changed, 249 insertions(+), 137 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputChunkProxy.java rename community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/{EagerlyReadInputChunk.java => EagerCsvInputChunk.java} (90%) create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/LazyCsvInputChunk.java diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvGroupInputIterator.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvGroupInputIterator.java index ff45cf52be56c..c1106f248b488 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvGroupInputIterator.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvGroupInputIterator.java @@ -43,6 +43,7 @@ public class CsvGroupInputIterator extends InputIterator.Adapter private final Collector badCollector; private final Groups groups; private CsvInputIterator current; + private int groupId; public CsvGroupInputIterator( Iterator source, Header.Factory headerFactory, IdType idType, Configuration config, Collector badCollector, Groups groups ) @@ -56,12 +57,9 @@ public CsvGroupInputIterator( Iterator source, Header.Factory heade } @Override - public InputChunk newChunk() + public CsvInputChunkProxy newChunk() { - return config.multilineFields() - ? new EagerlyReadInputChunk() - : new CsvInputChunk( idType, config.delimiter(), badCollector, extractors( config ), - new ChunkImpl( new char[config.bufferSize()] ) ); + return new CsvInputChunkProxy(); } static Extractors extractors( Configuration config ) @@ -82,10 +80,10 @@ public synchronized boolean next( InputChunk chunk ) throws IOException } Data data = source.next().create( config ); current = new CsvInputIterator( new MultiReadable( data.stream() ), data.decorator(), - headerFactory, idType, config, groups, badCollector, extractors( config ) ); + headerFactory, idType, config, groups, badCollector, extractors( config ), groupId++ ); } - if ( current.next( chunk ) ) + if ( current.next( (CsvInputChunkProxy) chunk ) ) { return true; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java index b7e884b2f697f..89d4569c67be3 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java @@ -36,12 +36,10 @@ import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.Groups; import org.neo4j.unsafe.impl.batchimport.input.Input; -import org.neo4j.unsafe.impl.batchimport.input.InputChunk; import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.values.storable.Value; import static org.neo4j.csv.reader.CharSeekers.charSeeker; -import static org.neo4j.helpers.collection.Iterators.iterator; import static org.neo4j.io.ByteUnit.mebiBytes; import static org.neo4j.unsafe.impl.batchimport.InputIterable.replayable; import static org.neo4j.unsafe.impl.batchimport.input.Collector.EMPTY; @@ -211,12 +209,13 @@ private long[] sample( Iterable dataFactories, Header.Factory heade ToIntFunction valueSizeCalculator, ToIntFunction additionalCalculator ) throws IOException { long[] estimates = new long[4]; // [entity count, property count, property size, labels (for nodes only)] - try ( CsvGroupInputIterator group = new CsvGroupInputIterator( iterator(), headerFactory, idType, config, EMPTY, groups ); - InputChunk chunk = group.newChunk() ) + try ( CsvInputChunkProxy chunk = new CsvInputChunkProxy() ) { // One group of input files + int groupId = 0; for ( DataFactory dataFactory : dataFactories ) // one input group { + groupId++; Header header = null; Data data = dataFactory.create( config ); RawIterator sources = data.stream(); @@ -230,7 +229,7 @@ private long[] sample( Iterable dataFactories, Header.Factory heade header = extractHeader( source, headerFactory, idType, config, groups ); } try ( CsvInputIterator iterator = new CsvInputIterator( source, data.decorator(), header, config, - idType, badCollector, extractors( config ) ); + idType, EMPTY, extractors( config ), groupId ); InputEntity entity = new InputEntity() ) { int entities = 0; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputChunk.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputChunk.java index c601fe5a6d1c3..da277d0392b20 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputChunk.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputChunk.java @@ -20,98 +20,22 @@ package org.neo4j.unsafe.impl.batchimport.input.csv; import java.io.IOException; -import org.neo4j.csv.reader.CharReadableChunker.ChunkImpl; -import org.neo4j.csv.reader.CharSeeker; -import org.neo4j.csv.reader.Extractors; -import org.neo4j.csv.reader.Source.Chunk; -import org.neo4j.unsafe.impl.batchimport.input.Collector; + +import org.neo4j.csv.reader.Chunker; import org.neo4j.unsafe.impl.batchimport.input.InputChunk; -import org.neo4j.unsafe.impl.batchimport.input.InputEntityVisitor; /** - * {@link InputChunk} parsing next entry on each call to {@link #next(InputEntityVisitor)}. + * {@link InputChunk} that gets data from {@link Chunker}. Making it explicit in the interface simplifies implementation + * where there are different types of {@link Chunker} for different scenarios. */ -public class CsvInputChunk implements InputChunk +public interface CsvInputChunk extends InputChunk { - private final IdType idType; - private final int delimiter; - private final Collector badCollector; - private final Chunk processingChunk; - - // Set in #initialize - private CsvInputParser parser; - private Decorator decorator; - - // Set as #next is called - private InputEntityVisitor previousVisitor; - private InputEntityVisitor visitor; - private final Extractors extractors; - - public CsvInputChunk( IdType idType, int delimiter, Collector badCollector, Extractors extractors, - ChunkImpl processingChunk ) - { - this.idType = idType; - this.badCollector = badCollector; - this.extractors = extractors; - this.delimiter = delimiter; - this.processingChunk = processingChunk; - } - /** - * Called every time this chunk is updated with new data. Potentially this data is from a different - * stream of data than the previous, therefore the header and decorator is also updated. - * @param seeker {@link CharSeeker} able to seek through the data. - * @param header {@link Header} spec to read data according to. - * @param decorator additional decoration of the {@link InputEntityVisitor} coming into - * {@link #next(InputEntityVisitor)}. - * @throws IOException on I/O error. + * Fills this {@link InputChunk} from the given {@link Chunker}. + * + * @param chunker to read next chunk from. + * @return {@code true} if there was data read, otherwise {@code false}, meaning end of stream. + * @throws IOException on I/O read error. */ - boolean initialize( CharSeeker seeker, Header header, Decorator decorator ) throws IOException - { - closeCurrentParser(); - this.decorator = decorator; - this.visitor = null; - this.parser = new CsvInputParser( seeker, delimiter, idType, header, badCollector, extractors ); - if ( header.entries().length == 0 ) - { - return false; - } - return true; - } - - private void closeCurrentParser() throws IOException - { - if ( parser != null ) - { - parser.close(); - } - } - - @Override - public boolean next( InputEntityVisitor nakedVisitor ) throws IOException - { - if ( visitor == null || nakedVisitor != previousVisitor ) - { - decorateVisitor( nakedVisitor ); - } - - return parser.next( visitor ); - } - - private void decorateVisitor( InputEntityVisitor nakedVisitor ) - { - visitor = decorator.apply( nakedVisitor ); - previousVisitor = nakedVisitor; - } - - protected Chunk processingChunk() - { - return processingChunk; - } - - @Override - public void close() throws IOException - { - closeCurrentParser(); - } + boolean fillFrom( Chunker chunker ) throws IOException; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputChunkProxy.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputChunkProxy.java new file mode 100644 index 0000000000000..9c7baba44c220 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputChunkProxy.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.input.csv; + +import java.io.IOException; +import java.util.function.Supplier; + +import org.neo4j.csv.reader.Chunker; +import org.neo4j.unsafe.impl.batchimport.InputIterator; +import org.neo4j.unsafe.impl.batchimport.input.InputEntityVisitor; + +/** + * {@link CsvInputChunk} that adapts new input source groups during the streaming of data. + * {@link InputIterator} is fairly straight-forward, but is made a bit more complicated by the fact that + * there can be multiple different data streams. The outer iterator, {@link CsvGroupInputIterator}, is still responsible + * for handing out chunks, something that generally is good thing since it solves a bunch of other problems. + * The problem it has is that it doesn't know exactly which type of {@link CsvInputChunk} it wants to create, + * because that's up to each input group. This gap is bridged here in this class. + */ +public class CsvInputChunkProxy implements CsvInputChunk +{ + private CsvInputChunk actual; + private int groupId = -1; + + public void ensureInstantiated( Supplier newChunk, int groupId ) throws IOException + { + if ( actual == null || groupId != this.groupId ) + { + closeCurrent(); + actual = newChunk.get(); + } + this.groupId = groupId; + } + + @Override + public void close() throws IOException + { + closeCurrent(); + } + + private void closeCurrent() throws IOException + { + if ( actual != null ) + { + actual.close(); + } + } + + @Override + public boolean fillFrom( Chunker chunker ) throws IOException + { + return actual.fillFrom( chunker ); + } + + @Override + public boolean next( InputEntityVisitor visitor ) throws IOException + { + return actual.next( visitor ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputIterator.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputIterator.java index 486291b0697c5..7efdad3df0f0e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputIterator.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputIterator.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.function.Supplier; import org.neo4j.csv.reader.BufferedCharSeeker; import org.neo4j.csv.reader.CharReadable; @@ -35,9 +36,8 @@ import org.neo4j.csv.reader.SourceTraceability; import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.Groups; -import org.neo4j.unsafe.impl.batchimport.input.InputChunk; - import static java.util.Arrays.copyOf; +import static org.neo4j.unsafe.impl.batchimport.input.csv.CsvGroupInputIterator.extractors; /** * Iterates over one stream of data, where all data items conform to the same {@link Header}. @@ -47,26 +47,37 @@ class CsvInputIterator implements SourceTraceability, Closeable { private final CharReadable stream; private final Chunker chunker; - private final Header header; + private final int groupId; private final Decorator decorator; - private final Configuration config; + private final Supplier realInputChunkSupplier; CsvInputIterator( CharReadable stream, Decorator decorator, Header header, Configuration config, IdType idType, Collector badCollector, - Extractors extractors ) + Extractors extractors, int groupId ) { this.stream = stream; - this.config = config; this.decorator = decorator; - this.header = header; - this.chunker = config.multilineFields() - ? new EagerParserChunker( stream, idType, header, badCollector, extractors, 1_000, config, decorator ) - : new ClosestNewLineChunker( stream, config.bufferSize() ); + this.groupId = groupId; + if ( config.multilineFields() ) + { + // If we're expecting multi-line fields then there's no way to arbitrarily chunk the underlying data source + // and find record delimiters with certainty. This is why we opt for a chunker that does parsing inside + // the call that normally just hands out an arbitrary amount of characters to parse outside and in parallel. + // This chunker is single-threaded, as it was previously too and keeps the functionality of multi-line fields. + this.chunker = new EagerParserChunker( stream, idType, header, badCollector, extractors, 1_000, config, decorator ); + this.realInputChunkSupplier = EagerCsvInputChunk::new; + } + else + { + this.chunker = new ClosestNewLineChunker( stream, config.bufferSize() ); + this.realInputChunkSupplier = () -> new LazyCsvInputChunk( idType, config.delimiter(), badCollector, + extractors( config ), chunker.newChunk(), config, decorator, header ); + } } - CsvInputIterator( CharReadable stream, Decorator decorator, Header.Factory headerFactory, IdType idType, - Configuration config, Groups groups, Collector badCollector, Extractors extractors ) throws IOException + CsvInputIterator( CharReadable stream, Decorator decorator, Header.Factory headerFactory, IdType idType, Configuration config, Groups groups, + Collector badCollector, Extractors extractors, int groupId ) throws IOException { - this( stream, decorator, extractHeader( stream, headerFactory, idType, config, groups ), config, idType, badCollector, extractors ); + this( stream, decorator, extractHeader( stream, headerFactory, idType, config, groups ), config, idType, badCollector, extractors, groupId ); } static Header extractHeader( CharReadable stream, Header.Factory headerFactory, IdType idType, @@ -86,27 +97,10 @@ static Header extractHeader( CharReadable stream, Header.Factory headerFactory, return headerFactory.create( null, null, null, null ); } - public boolean next( InputChunk chunk ) throws IOException - { - if ( config.multilineFields() ) - { - EagerlyReadInputChunk csvChunk = (EagerlyReadInputChunk) chunk; - return chunker.nextChunk( csvChunk ); - } - - CsvInputChunk csvChunk = (CsvInputChunk) chunk; - Chunk processingChunk = csvChunk.processingChunk(); - if ( chunker.nextChunk( processingChunk ) ) - { - return initialized( chunk, seeker( processingChunk, config ) ); - } - return false; - } - - private boolean initialized( InputChunk chunk, CharSeeker seeker ) throws IOException + public boolean next( CsvInputChunkProxy proxy ) throws IOException { - CsvInputChunk csvChunk = (CsvInputChunk) chunk; - return csvChunk.initialize( seeker, header.clone(), decorator ); + proxy.ensureInstantiated( realInputChunkSupplier, groupId ); + return proxy.fillFrom( chunker ); } @Override @@ -128,7 +122,7 @@ public long position() return chunker.position(); } - private static CharSeeker seeker( Chunk chunk, Configuration config ) + static CharSeeker seeker( Chunk chunk, Configuration config ) { return new BufferedCharSeeker( Source.singleChunk( chunk ), config ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/EagerlyReadInputChunk.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/EagerCsvInputChunk.java similarity index 90% rename from community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/EagerlyReadInputChunk.java rename to community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/EagerCsvInputChunk.java index 111a2d2e54681..dc05c92174221 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/EagerlyReadInputChunk.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/EagerCsvInputChunk.java @@ -21,12 +21,12 @@ import java.io.IOException; +import org.neo4j.csv.reader.Chunker; import org.neo4j.csv.reader.Source; -import org.neo4j.unsafe.impl.batchimport.input.InputChunk; import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.input.InputEntityVisitor; -class EagerlyReadInputChunk implements InputChunk, Source.Chunk +class EagerCsvInputChunk implements CsvInputChunk, Source.Chunk { private InputEntity[] entities; private int cursor; @@ -53,6 +53,12 @@ public void close() throws IOException { } + @Override + public boolean fillFrom( Chunker chunker ) throws IOException + { + return chunker.nextChunk( this ); + } + @Override public char[] data() { diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/EagerParserChunker.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/EagerParserChunker.java index 9cd1a5305df8c..3e466ac33806f 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/EagerParserChunker.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/EagerParserChunker.java @@ -33,7 +33,7 @@ /** * {@link Chunker} which parses a chunk of entities when calling {@link #nextChunk(Chunk)}, - * injecting them into {@link EagerlyReadInputChunk}, which simply hands them out one by one. + * injecting them into {@link EagerCsvInputChunk}, which simply hands them out one by one. */ public class EagerParserChunker implements Chunker { @@ -63,7 +63,7 @@ public boolean nextChunk( Chunk chunk ) throws IOException if ( cursor > 0 ) { - ((EagerlyReadInputChunk)chunk).initialize( entities.toArray() ); + ((EagerCsvInputChunk)chunk).initialize( entities.toArray() ); return true; } return false; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/LazyCsvInputChunk.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/LazyCsvInputChunk.java new file mode 100644 index 0000000000000..2de0ebcb99a6b --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/LazyCsvInputChunk.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.input.csv; + +import java.io.IOException; + +import org.neo4j.csv.reader.Chunker; +import org.neo4j.csv.reader.Extractors; +import org.neo4j.csv.reader.Source.Chunk; +import org.neo4j.unsafe.impl.batchimport.input.Collector; +import org.neo4j.unsafe.impl.batchimport.input.InputChunk; +import org.neo4j.unsafe.impl.batchimport.input.InputEntityVisitor; + +import static org.neo4j.unsafe.impl.batchimport.input.csv.CsvInputIterator.seeker; + +/** + * {@link InputChunk} parsing next entry on each call to {@link #next(InputEntityVisitor)}. + */ +public class LazyCsvInputChunk implements CsvInputChunk +{ + private final IdType idType; + private final int delimiter; + private final Collector badCollector; + private final Chunk processingChunk; + private final Configuration config; + private final Decorator decorator; + private final Header header; + private final Extractors extractors; + + // Set in #fillFrom + private CsvInputParser parser; + + // Set as #next is called + private InputEntityVisitor previousVisitor; + private InputEntityVisitor visitor; + + public LazyCsvInputChunk( IdType idType, int delimiter, Collector badCollector, Extractors extractors, Chunk processingChunk, Configuration config, + Decorator decorator, Header header ) + { + this.idType = idType; + this.badCollector = badCollector; + this.extractors = extractors; + this.delimiter = delimiter; + this.processingChunk = processingChunk; + this.config = config; + this.decorator = decorator; + this.header = header; + } + + @Override + public boolean fillFrom( Chunker chunker ) throws IOException + { + if ( chunker.nextChunk( processingChunk ) ) + { + closeCurrentParser(); + this.visitor = null; + this.parser = new CsvInputParser( seeker( processingChunk, config ), delimiter, idType, header.clone(), badCollector, extractors ); + if ( header.entries().length == 0 ) + { + return false; + } + return true; + } + return false; + } + + private void closeCurrentParser() throws IOException + { + if ( parser != null ) + { + parser.close(); + } + } + + @Override + public boolean next( InputEntityVisitor nakedVisitor ) throws IOException + { + if ( visitor == null || nakedVisitor != previousVisitor ) + { + decorateVisitor( nakedVisitor ); + } + + return parser.next( visitor ); + } + + private void decorateVisitor( InputEntityVisitor nakedVisitor ) + { + visitor = decorator.apply( nakedVisitor ); + previousVisitor = nakedVisitor; + } + + @Override + public void close() throws IOException + { + closeCurrentParser(); + } +}