From a683fb80392e384e616282b79cbe4832b6168cd6 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Tue, 19 Jul 2016 11:26:49 +0200 Subject: [PATCH] CsvInput parses data in parallel takes advantage of being able to determine where any data row starts and another ends if multi-line fields are disallowed. In this case chunks of csv data can be read and handed off to one or more threads parsing the data, to finally end up merged back in the same order as they were read from the csv source. From the outside there's no difference, it still provides `InputIterator` for nodes and relationships. --- .../neo4j/csv/reader/ProcessingSource.java | 230 ++++++++++++++++++ .../csv/reader/ProcessingSourceTest.java | 219 +++++++++++++++++ .../java/org/neo4j/tooling/ImportTool.java | 11 +- .../neo4j/tooling/SimpleDataGenerator.java | 3 +- .../neo4j/tooling/StringDeserialization.java | 9 +- .../cache/NodeRelationshipCache.java | 5 +- .../executor/DynamicTaskExecutor.java | 4 +- .../executor/TaskExecutionPanicException.java | 33 +++ .../unsafe/impl/batchimport/input/Inputs.java | 4 +- .../impl/batchimport/input/csv/CsvInput.java | 38 +-- .../impl/batchimport/input/csv/Data.java | 3 +- .../batchimport/input/csv/DataFactories.java | 9 +- .../csv/ExternalPropertiesDecorator.java | 4 +- .../input/csv/InputGroupsDeserializer.java | 79 ++++-- .../csv/ParallelInputEntityDeserializer.java | 216 ++++++++++++++++ .../input/csv/CsvInputBatchImportIT.java | 4 +- .../batchimport/input/csv/CsvInputTest.java | 97 ++++---- .../input/csv/DataFactoriesTest.java | 2 +- .../csv/InputGroupsDeserializerTest.java | 54 ++-- .../ParallelInputEntityDeserializerTest.java | 128 ++++++++++ 20 files changed, 1018 insertions(+), 134 deletions(-) create mode 100644 community/csv/src/main/java/org/neo4j/csv/reader/ProcessingSource.java create mode 100644 community/csv/src/test/java/org/neo4j/csv/reader/ProcessingSourceTest.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutionPanicException.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java diff --git a/community/csv/src/main/java/org/neo4j/csv/reader/ProcessingSource.java b/community/csv/src/main/java/org/neo4j/csv/reader/ProcessingSource.java new file mode 100644 index 0000000000000..7ab1f23284f72 --- /dev/null +++ b/community/csv/src/main/java/org/neo4j/csv/reader/ProcessingSource.java @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.csv.reader; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import org.neo4j.csv.reader.Source.Chunk; + +/** + * In a scenario where there's one reader reading chunks of data, handing those chunks to one or + * more processors (parsers) of that data, this class comes in handy. This pattern allows for + * multiple {@link BufferedCharSeeker seeker instances}, each operating over one chunk, not transitioning itself + * into the next. + */ +public class ProcessingSource implements Closeable +{ + // Marker for a buffer slot being unallocated + private static final char[] UNALLOCATED = new char[0]; + // Marker for a buffer being allocated, although currently used + private static final char[] IN_USE = new char[0]; + + private final CharReadable reader; + private final int chunkSize; + private char[] backBuffer; // grows on demand + private int backBufferCursor; + + // Buffer reuse. Each item starts out as UNALLOCATED, transitions into IN_USE and tied to a Chunk, + // which will put its allocated buffer back into that slot on Chunk#close(). After that flipping between + // an allocated char[] and IN_USE. + private final AtomicReferenceArray buffers; + + public ProcessingSource( CharReadable reader, int chunkSize, int maxNumberOfBufferedChunks ) + { + this.reader = reader; + this.chunkSize = chunkSize; + this.backBuffer = new char[chunkSize >> 4]; + this.buffers = new AtomicReferenceArray<>( maxNumberOfBufferedChunks ); + for ( int i = 0; i < buffers.length(); i++ ) + { + buffers.set( i, UNALLOCATED ); + } + } + + /** + * Must be called by a single thread, the same thread every time. + * + * @return the next {@link Chunk} of data, ending with a new-line or not for the last chunk. + * @throws IOException on reading error. + */ + public Chunk nextChunk() throws IOException + { + Buffer buffer = newBuffer(); + int offset = 0; + + if ( backBufferCursor > 0 ) + { // Read from and reset back buffer + assert backBufferCursor < chunkSize; + System.arraycopy( backBuffer, 0, buffer.data, 0, backBufferCursor ); + offset += backBufferCursor; + backBufferCursor = 0; + } + + int leftToRead = chunkSize - offset; + int read = reader.read( buffer.data, offset, leftToRead ); + if ( read == leftToRead ) + { // Read from reader. We read data into the whole buffer and there seems to be more data left in reader. + // This means we're most likely not at the end so seek backwards to the last newline character and + // put the characters after the newline character(s) into the back buffer. + int newlineOffset = offsetOfLastNewline( buffer.data ); + if ( newlineOffset > -1 ) + { // We found a newline character some characters back + backBufferCursor = chunkSize - (newlineOffset + 1); + System.arraycopy( buffer.data, newlineOffset + 1, backBuffer( backBufferCursor ), 0, backBufferCursor ); + read -= backBufferCursor; + } + else + { // There was no newline character, isn't that weird? + throw new IllegalStateException( "Weird input data, no newline character in the whole buffer " + + chunkSize + ", not supported a.t.m." ); + } + } + // else we couldn't completely fill the buffer, this means that we're at the end of a data source, we're good. + + if ( read > -1 ) + { + offset += read; + } + + return new ProcessingChunk( buffer, offset, reader.sourceDescription() ); + } + + private char[] backBuffer( int length ) + { + if ( length > backBuffer.length ) + { + backBuffer = Arrays.copyOf( backBuffer, length ); + } + return backBuffer; + } + + private Buffer newBuffer() + { + // Scan through the array to find one + for ( int i = 0; i < buffers.length(); i++ ) + { + char[] current = buffers.get( i ); + if ( current == UNALLOCATED || current != IN_USE ) + { + // Mark that this buffer is currently being used + buffers.set( i, IN_USE ); + return new Buffer( current == UNALLOCATED ? new char[chunkSize] : current, i ); + } + } + + // With external push-back this shouldn't be an issue, but instead of introducing blocking + // here just fall back to creating a new buffer which will not be eligible for reuse. + return new Buffer( new char[chunkSize], -1 ); + } + + @Override + public void close() throws IOException + { + reader.close(); + } + + private static int offsetOfLastNewline( char[] buffer ) + { + for ( int i = buffer.length-1; i >= 0; i-- ) + { + if ( buffer[i] == '\n' ) + { + return i; + } + } + return -1; + } + + private class ProcessingChunk implements Chunk + { + private final Buffer buffer; + private final int length; + private final String sourceDescription; + + public ProcessingChunk( Buffer buffer, int length, String sourceDescription ) + { + this.buffer = buffer; + this.length = length; + this.sourceDescription = sourceDescription; + } + + @Override + public int startPosition() + { + return 0; + } + + @Override + public String sourceDescription() + { + return sourceDescription; + } + + @Override + public int maxFieldSize() + { + return chunkSize; + } + + @Override + public int length() + { + return length; + } + + @Override + public char[] data() + { + return buffer.data; + } + + @Override + public int backPosition() + { + return 0; + } + + @Override + public void close() + { + if ( buffer.reuseIndex != -1 ) + { + // Give the buffer back to the source so that it can be reused + buffers.set( buffer.reuseIndex, buffer.data ); + } + // else this was a detached buffer which we cannot really put back into a reuse slot + } + } + + private static class Buffer + { + private final char[] data; + private final int reuseIndex; + + Buffer( char[] data, int reuseIndex ) + { + this.data = data; + this.reuseIndex = reuseIndex; + } + } +} diff --git a/community/csv/src/test/java/org/neo4j/csv/reader/ProcessingSourceTest.java b/community/csv/src/test/java/org/neo4j/csv/reader/ProcessingSourceTest.java new file mode 100644 index 0000000000000..6e15e19a26b61 --- /dev/null +++ b/community/csv/src/test/java/org/neo4j/csv/reader/ProcessingSourceTest.java @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.csv.reader; + +import org.junit.Test; + +import java.io.IOException; +import java.io.StringReader; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import org.neo4j.csv.reader.Source.Chunk; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import static java.util.Arrays.copyOfRange; +import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; + +import static org.neo4j.csv.reader.Source.EMPTY_CHUNK; + +public class ProcessingSourceTest +{ + @Test + public void shouldBackUpChunkToClosestNewline() throws Exception + { + // GIVEN + CharReadable reader = Readables.wrap( new StringReader( "1234567\n8901234\n5678901234" ) ); + // (next chunks): ^ ^ + // (actual chunks): ^ ^ + try ( ProcessingSource source = new ProcessingSource( reader, 12, 1 ) ) + { + // WHEN + Chunk first = source.nextChunk(); + assertArrayEquals( "1234567\n".toCharArray(), charactersOf( first ) ); + Chunk second = source.nextChunk(); + assertArrayEquals( "8901234\n".toCharArray(), charactersOf( second ) ); + Chunk third = source.nextChunk(); + assertArrayEquals( "5678901234".toCharArray(), charactersOf( third ) ); + + // THEN + assertEquals( 0, source.nextChunk().length() ); + } + } + + @Test + public void shouldFailIfNoNewlineInChunk() throws Exception + { + // GIVEN + CharReadable reader = Readables.wrap( new StringReader( "1234567\n89012345678901234" ) ); + // (next chunks): ^ + // (actual chunks): ^ + try ( ProcessingSource source = new ProcessingSource( reader, 12, 1 ) ) + { + // WHEN + Chunk first = source.nextChunk(); + assertArrayEquals( "1234567\n".toCharArray(), charactersOf( first ) ); + try + { + source.nextChunk(); + fail( "Should have failed here" ); + } + catch ( IllegalStateException e ) + { + // THEN good + } + } + } + + @Test + public void shouldReuseBuffers() throws Exception + { + // GIVEN + ProcessingSource source = new ProcessingSource( dataWithLines( 2 ), 100, 1 ); + + // WHEN + Chunk firstChunk = source.nextChunk(); + char[] firstBuffer = firstChunk.data(); + firstChunk.close(); + + // THEN + Chunk secondChunk = source.nextChunk(); + char[] secondBuffer = secondChunk.data(); + secondChunk.close(); + assertSame( firstBuffer, secondBuffer ); + source.close(); + } + + @Test + public void shouldReuseBuffersEventually() throws Exception + { + // GIVEN + ProcessingSource source = new ProcessingSource( dataWithLines( 5 ), 100, 2 ); + Chunk firstChunk = source.nextChunk(); + char[] firstBuffer = firstChunk.data(); + + // WHEN + Chunk secondChunk = source.nextChunk(); + char[] secondBuffer = secondChunk.data(); + assertNotSame( secondBuffer, firstBuffer ); + + // THEN + firstChunk.close(); + Chunk thirdChunk = source.nextChunk(); + char[] thirdBuffer = thirdChunk.data(); + assertSame( firstBuffer, thirdBuffer ); + + secondChunk.close(); + thirdChunk.close(); + source.close(); + } + + @Test + public void shouldStressReuse() throws Exception + { + // GIVEN + int nThreads = 10; + ProcessingSource source = new ProcessingSource( dataWithLines( 3_000 ), 100, nThreads ); + ExecutorService executor = newFixedThreadPool( nThreads ); + AtomicInteger activeProcessors = new AtomicInteger(); + + // WHEN + Chunk chunk = EMPTY_CHUNK; + Set observedDataArrays = new HashSet<>(); + do + { + while ( activeProcessors.get() == nThreads ) + { // Provide push-back which normally happens when using a ProcessingSource, although perhaps not + } // with a busy-wait like this, but that's not really important. + + // Get next chunk and register the array instance we got + chunk = source.nextChunk(); + observedDataArrays.add( chunk.data() ); + + // Update data for push-back of the load in this test + activeProcessors.incrementAndGet(); + + // Submit this chunk for processing (no-op) and closing (reuse) + Chunk currentChunk = chunk; + executor.submit( () -> + { + currentChunk.close(); + activeProcessors.decrementAndGet(); + } ); + } + while ( chunk.length() > 0 ); + executor.shutdown(); + executor.awaitTermination( 100, SECONDS ); + + // THEN + source.close(); + assertTrue( "" + observedDataArrays.size(), + observedDataArrays.size() >= 1 && observedDataArrays.size() <= nThreads ); + } + + private CharReadable dataWithLines( int lineCount ) + { + return new CharReadable.Adapter() + { + private int line; + + @Override + public String sourceDescription() + { + return "test"; + } + + @Override + public int read( char[] into, int offset, int length ) throws IOException + { + assert offset == 0 : "This test assumes offset is 0, " + + "which it always was for this use case at the time of writing"; + if ( line++ == lineCount ) + { + return -1; + } + + // We cheat here and simply say that we read the requested amount of characters + into[length-1] = '\n'; + return length; + } + + @Override + public SectionedCharBuffer read( SectionedCharBuffer buffer, int from ) throws IOException + { + throw new UnsupportedOperationException(); + } + }; + } + + private char[] charactersOf( Chunk chunk ) + { + return copyOfRange( chunk.data(), chunk.startPosition(), chunk.startPosition() + chunk.length() ); + } +} diff --git a/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java b/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java index 92c34dc5e96d0..a37c9285ad3f5 100644 --- a/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java +++ b/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java @@ -319,6 +319,7 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit boolean skipBadRelationships, skipDuplicateNodes, ignoreExtraColumns; Config dbConfig; OutputStream badOutput = null; + org.neo4j.unsafe.impl.batchimport.Configuration configuration = null; boolean success = false; try @@ -348,11 +349,13 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit Collector badCollector = badCollector( badOutput, badTolerance, collect( skipBadRelationships, skipDuplicateNodes, ignoreExtraColumns ) ); - input = new CsvInput( nodeData( inputEncoding, nodesFiles ), defaultFormatNodeFileHeader(), - relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(), - idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector ); dbConfig = loadDbConfig( args.interpretOption( Options.DATABASE_CONFIG.key(), Converters.optional(), Converters.toFile(), Validators.REGEX_FILE_EXISTS ) ); + configuration = importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig ); + input = new CsvInput( nodeData( inputEncoding, nodesFiles ), defaultFormatNodeFileHeader(), + relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(), + idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector, + configuration.maxNumberOfProcessors() ); success = true; } catch ( IllegalArgumentException e ) @@ -376,8 +379,6 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit LogService logService = life.add( StoreLogService.inLogsDirectory( fs, storeDir ) ); life.start(); - org.neo4j.unsafe.impl.batchimport.Configuration configuration = - importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig ); BatchImporter importer = new ParallelBatchImporter( storeDir, configuration, logService, diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGenerator.java b/community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGenerator.java index c480b49bb1c70..ef36aaa7bfb3b 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGenerator.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGenerator.java @@ -40,6 +40,7 @@ public class SimpleDataGenerator extends SourceTraceability.Adapter private final Distribution relationshipTypes; private final Groups groups = new Groups(); private final IdType idType; + private final String className = getClass().getSimpleName(); public SimpleDataGenerator( Header nodeHeader, Header relationshipHeader, long randomSeed, long nodeCount, int labelCount, int relationshipTypeCount, IdType idType ) @@ -82,6 +83,6 @@ private static String[] tokens( String prefix, int count ) @Override public String sourceDescription() { - return getClass().getSimpleName(); + return className; } } diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/StringDeserialization.java b/community/import-tool/src/test/java/org/neo4j/tooling/StringDeserialization.java index 377dcb3bd4e1f..38667cea1acd0 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/StringDeserialization.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/StringDeserialization.java @@ -53,10 +53,13 @@ public void handle( Entry entry, Object value ) { builder.append( config.delimiter() ); } - stringify( entry, value ); + if ( value != null ) + { + stringify( value ); + } } - private void stringify( Entry entry, Object value ) + private void stringify( Object value ) { if ( value instanceof String ) { @@ -82,7 +85,7 @@ else if ( value.getClass().isArray() ) { builder.append( config.arrayDelimiter() ); } - stringify( entry, item ); + stringify( item ); } } else if ( value instanceof Number ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java index 8a3f0ecdf5a50..336aaf1e88389 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java @@ -515,7 +515,10 @@ long putRelationship( long relGroupIndex, Direction direction, @Override public void close() { - array.close(); + if ( array != null ) + { + array.close(); + } } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java index 9c51240e38ce7..b4ee66b416119 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java @@ -132,11 +132,11 @@ public void assertHealthy() { if ( panic != null ) { - throw new IllegalStateException( "Executor has been shut down in panic", panic ); + throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic ); } if ( abortQueued ) { - throw new IllegalStateException( "Executor has been shut down, aborting queued" ); + throw new TaskExecutionPanicException( "Executor has been shut down, aborting queued" ); } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutionPanicException.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutionPanicException.java new file mode 100644 index 0000000000000..41aaa7512e269 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutionPanicException.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.executor; + +public class TaskExecutionPanicException extends IllegalStateException +{ + public TaskExecutionPanicException( String message, Throwable cause ) + { + super( message, cause ); + } + + public TaskExecutionPanicException( String message ) + { + super( message ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Inputs.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Inputs.java index 1d2911536be91..06685802db83f 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Inputs.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/Inputs.java @@ -79,12 +79,12 @@ public Collector badCollector() } public static Input csv( File nodes, File relationships, IdType idType, - Configuration configuration, Collector badCollector ) + Configuration configuration, Collector badCollector, int maxProcessors ) { return new CsvInput( nodeData( data( NO_NODE_DECORATOR, defaultCharset(), nodes ) ), defaultFormatNodeFileHeader(), relationshipData( data( NO_RELATIONSHIP_DECORATOR, defaultCharset(), relationships ) ), defaultFormatRelationshipFileHeader(), idType, configuration, - badCollector ); + badCollector, maxProcessors ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java index 3c0558d3d841b..9c053c6a1d0d3 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInput.java @@ -21,8 +21,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.function.Function; - import org.neo4j.csv.reader.CharSeeker; import org.neo4j.kernel.impl.util.Validators; import org.neo4j.unsafe.impl.batchimport.InputIterable; @@ -50,6 +48,7 @@ public class CsvInput implements Input private final Configuration config; private final Groups groups = new Groups(); private final Collector badCollector; + private final int maxProcessors; /** * @param nodeDataFactory multiple {@link DataFactory} instances providing data, each {@link DataFactory} @@ -62,12 +61,15 @@ public class CsvInput implements Input * @param relationshipHeaderFactory factory for reading relationship headers. * @param idType {@link IdType} to expect in id fields of node and relationship input. * @param config CSV configuration. + * @param badCollector Collector getting calls about bad input data. + * @param maxProcessors maximum number of processors in scenarios where multiple threads may parse CSV data. */ public CsvInput( Iterable> nodeDataFactory, Header.Factory nodeHeaderFactory, Iterable> relationshipDataFactory, Header.Factory relationshipHeaderFactory, - IdType idType, Configuration config, Collector badCollector ) + IdType idType, Configuration config, Collector badCollector, int maxProcessors ) { + this.maxProcessors = maxProcessors; assertSaneConfiguration( config ); this.nodeDataFactory = nodeDataFactory; @@ -105,18 +107,11 @@ public InputIterable nodes() @Override public InputIterator iterator() { - return new InputGroupsDeserializer( nodeDataFactory.iterator(), - nodeHeaderFactory, config, idType ) - { - @Override - protected InputEntityDeserializer entityDeserializer( CharSeeker dataStream, - Header dataHeader, Function decorator ) - { - return new InputEntityDeserializer<>( dataHeader, dataStream, config.delimiter(), + return new InputGroupsDeserializer<>( nodeDataFactory.iterator(), + nodeHeaderFactory, config, idType, maxProcessors, (dataStream, dataHeader, decorator) -> + new InputEntityDeserializer<>( dataHeader, dataStream, config.delimiter(), new InputNodeDeserialization( dataStream, dataHeader, groups, idType.idsAreExternal() ), - decorator, Validators.emptyValidator(), badCollector ); - } - }; + decorator, Validators.emptyValidator(), badCollector ), InputNode.class ); } @Override @@ -135,18 +130,11 @@ public InputIterable relationships() @Override public InputIterator iterator() { - return new InputGroupsDeserializer( relationshipDataFactory.iterator(), - relationshipHeaderFactory, config, idType ) - { - @Override - protected InputEntityDeserializer entityDeserializer( CharSeeker dataStream, - Header dataHeader, Function decorator ) - { - return new InputEntityDeserializer<>( dataHeader, dataStream, config.delimiter(), + return new InputGroupsDeserializer<>( relationshipDataFactory.iterator(), + relationshipHeaderFactory, config, idType, maxProcessors, (dataStream, dataHeader, decorator) -> + new InputEntityDeserializer<>( dataHeader, dataStream, config.delimiter(), new InputRelationshipDeserialization( dataStream, dataHeader, groups ), - decorator, new InputRelationshipValidator(), badCollector ); - } - }; + decorator, new InputRelationshipValidator(), badCollector ), InputRelationship.class ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/Data.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/Data.java index 626d0d3f0c5b2..91be735973ed1 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/Data.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/Data.java @@ -21,6 +21,7 @@ import java.util.function.Function; +import org.neo4j.csv.reader.CharReadable; import org.neo4j.csv.reader.CharSeeker; import org.neo4j.unsafe.impl.batchimport.input.InputEntity; @@ -30,7 +31,7 @@ */ public interface Data { - CharSeeker stream(); + CharReadable stream(); Function decorator(); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/DataFactories.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/DataFactories.java index d305489836824..c9dfd9a5679c6 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/DataFactories.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/DataFactories.java @@ -45,7 +45,6 @@ import org.neo4j.unsafe.impl.batchimport.input.MissingHeaderException; import org.neo4j.unsafe.impl.batchimport.input.csv.Header.Entry; -import static org.neo4j.csv.reader.CharSeekers.charSeeker; import static org.neo4j.csv.reader.Readables.files; /** @@ -70,11 +69,11 @@ public static DataFactory data( final Funct return config -> new Data() { @Override - public CharSeeker stream() + public CharReadable stream() { try { - return charSeeker( files( charset, files ), config, true ); + return files( charset, files ); } catch ( IOException e ) { @@ -101,9 +100,9 @@ public static DataFactory data( final Funct return config -> new Data() { @Override - public CharSeeker stream() + public CharReadable stream() { - return charSeeker( readable.get(), config, true ); + return readable.get(); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ExternalPropertiesDecorator.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ExternalPropertiesDecorator.java index b3dac30ef7c58..66d8fb86c8ce3 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ExternalPropertiesDecorator.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ExternalPropertiesDecorator.java @@ -28,6 +28,8 @@ import org.neo4j.unsafe.impl.batchimport.input.InputNode; import org.neo4j.unsafe.impl.batchimport.input.UpdateBehaviour; +import static org.neo4j.csv.reader.CharSeekers.charSeeker; + /** * Pulls in properties from an external CSV source and amends them to the "main" input nodes. * Imagine some node input source: @@ -62,7 +64,7 @@ public ExternalPropertiesDecorator( DataFactory data, Header.Factory Configuration config, IdType idType, UpdateBehaviour updateBehaviour, Collector badCollector ) { this.updateBehaviour = updateBehaviour; - CharSeeker dataStream = data.create( config ).stream(); + CharSeeker dataStream = charSeeker( data.create( config ).stream(), config, true ); Header header = headerFactory.create( dataStream, config, idType ); this.deserializer = new InputEntityDeserializer<>( header, dataStream, config.delimiter(), new InputNodeDeserialization( dataStream, header, new Groups(), idType.idsAreExternal() ), diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputGroupsDeserializer.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputGroupsDeserializer.java index 784eb866285b8..68df138e18c7d 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputGroupsDeserializer.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputGroupsDeserializer.java @@ -21,18 +21,18 @@ import java.util.Iterator; import java.util.function.Function; - import org.neo4j.csv.reader.CharSeeker; import org.neo4j.helpers.collection.NestingIterator; import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.input.InputEntity; +import static org.neo4j.csv.reader.CharSeekers.charSeeker; /** * Able to deserialize one input group. An input group is a list of one or more input files containing * its own header. An import can read multiple input groups. Each group is deserialized by * {@link InputEntityDeserializer}. */ -abstract class InputGroupsDeserializer +class InputGroupsDeserializer extends NestingIterator> implements InputIterator { @@ -41,15 +41,30 @@ abstract class InputGroupsDeserializer private final IdType idType; private InputIterator currentInput = new InputIterator.Empty<>(); private long previousInputsCollectivePositions; + private int previousInputProcessors = 1; private boolean currentInputOpen; + private final int maxProcessors; + private final DeserializerFactory factory; + private final Class entityClass; + + @FunctionalInterface + public interface DeserializerFactory + { + InputEntityDeserializer create( CharSeeker dataStream, Header dataHeader, + Function decorator ); + } InputGroupsDeserializer( Iterator> dataFactory, Header.Factory headerFactory, - Configuration config, IdType idType ) + Configuration config, IdType idType, int maxProcessors, DeserializerFactory factory, + Class entityClass ) { super( dataFactory ); this.headerFactory = headerFactory; this.config = config; this.idType = idType; + this.maxProcessors = maxProcessors; + this.factory = factory; + this.entityClass = entityClass; } @Override @@ -59,19 +74,41 @@ protected InputIterator createNestedIterator( DataFactory dataFa // Open the data stream. It's closed by the batch importer when execution is done. Data data = dataFactory.create( config ); - CharSeeker dataStream = data.stream(); - - // Read the header, given the data stream. This allows the header factory to be able to - // parse the header from the data stream directly. Or it can decide to grab the header - // from somewhere else, it's up to that factory. - Header dataHeader = headerFactory.create( dataStream, config, idType ); - - InputEntityDeserializer input = entityDeserializer( dataStream, dataHeader, data.decorator() ); - // It's important that we assign currentInput before calling initialize(), so that if something - // goes wrong in initialize() and our close() is called we close it properly. - currentInput = input; - currentInputOpen = true; - input.initialize(); + if ( config.multilineFields() ) + { + // Use a single-threaded reading and parsing because if we can expect multi-line fields it's + // nearly impossible to deduce where one row ends and another starts when diving into + // an arbitrary position in the file. + + CharSeeker dataStream = charSeeker( data.stream(), config, true ); + + // Read the header, given the data stream. This allows the header factory to be able to + // parse the header from the data stream directly. Or it can decide to grab the header + // from somewhere else, it's up to that factory. + Header dataHeader = headerFactory.create( dataStream, config, idType ); + + InputEntityDeserializer input = factory.create( dataStream, dataHeader, data.decorator() ); + // It's important that we assign currentInput before calling initialize(), so that if something + // goes wrong in initialize() and our close() is called we close it properly. + currentInput = input; + currentInputOpen = true; + input.initialize(); + } + else + { + // If the input fields aren't expected to contain multi-line fields we can do an optimization + // where we have one reader, reading chunks of data, handing over them to one or more parsing + // threads. The reader will read from its current position and N bytes ahead. When it gets there + // it will search backwards for the first new-line character and set the chunk end position + // to that position, effectively un-reading those characters back. This way each chunk will have + // complete rows of data and can be parsed individually by multiple threads. + + currentInput = new ParallelInputEntityDeserializer<>( data, headerFactory, config, idType, + maxProcessors, factory, entityClass ); + currentInput.processors( previousInputProcessors ); + currentInputOpen = true; + } + return currentInput; } @@ -80,14 +117,12 @@ private void closeCurrent() if ( currentInputOpen ) { previousInputsCollectivePositions += currentInput.position(); + previousInputProcessors = currentInput.processors( 0 ); currentInput.close(); currentInputOpen = false; } } - protected abstract InputEntityDeserializer entityDeserializer( CharSeeker dataStream, Header dataHeader, - Function decorator ); - @Override public void close() { @@ -111,4 +146,10 @@ public long lineNumber() { return currentInput.lineNumber(); } + + @Override + public int processors( int delta ) + { + return currentInput.processors( delta ); + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java new file mode 100644 index 0000000000000..326ea92d3e2c5 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.input.csv; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.function.Supplier; + +import org.neo4j.csv.reader.BufferedCharSeeker; +import org.neo4j.csv.reader.CharSeeker; +import org.neo4j.csv.reader.ProcessingSource; +import org.neo4j.csv.reader.SourceTraceability; +import org.neo4j.csv.reader.Source.Chunk; +import org.neo4j.helpers.Exceptions; +import org.neo4j.helpers.collection.PrefetchingIterator; +import org.neo4j.kernel.impl.util.collection.ContinuableArrayCursor; +import org.neo4j.unsafe.impl.batchimport.InputIterator; +import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException; +import org.neo4j.unsafe.impl.batchimport.input.InputEntity; +import org.neo4j.unsafe.impl.batchimport.input.InputException; +import org.neo4j.unsafe.impl.batchimport.input.InputNode; +import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; +import org.neo4j.unsafe.impl.batchimport.input.csv.InputGroupsDeserializer.DeserializerFactory; +import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing; + +import static org.neo4j.csv.reader.Source.singleChunk; + +/** + * Deserializes CSV into {@link InputNode} and {@link InputRelationship} and does so by reading characters + * in a dedicated thread while letting one or more threads parse the data. This can only safely be used if + * {@link Configuration#multilineFields()} is {@code false}. Initially only one parsing thread is assigned, + * more can be assigned at any point in time using {@link #processors(int)}. + * + * This class accepts {@link DeserializerFactory}, which normally instantiates {@link InputEntityDeserializer} + * instances. + * + * @param type of {@link InputEntity} to deserialize into + */ +public class ParallelInputEntityDeserializer extends InputIterator.Adapter +{ + private final ProcessingSource source; + private final TicketedProcessing processing; + private final ContinuableArrayCursor cursor; + private SourceTraceability last = SourceTraceability.EMPTY; + + @SuppressWarnings( "unchecked" ) + public ParallelInputEntityDeserializer( Data data, Header.Factory headerFactory, Configuration config, + IdType idType, int maxProcessors, DeserializerFactory factory, Class entityClass ) + { + // Reader of chunks, characters aligning to nearest newline + source = new ProcessingSource( data.stream(), config.bufferSize(), maxProcessors ); + try + { + // Read first chunk explicitly here since it contains the header + Chunk firstChunk = source.nextChunk(); + if ( firstChunk.length() == 0 ) + { + throw new InputException( "No header defined" ); + } + CharSeeker firstSeeker = new BufferedCharSeeker( singleChunk( firstChunk ), config ); + Header dataHeader = headerFactory.create( firstSeeker, config, idType ); + + // Initialize the processing logic for parsing the data in the first chunk, as well as in all other chunk + processing = new TicketedProcessing<>( "Parallel input parser", maxProcessors, (seeker, header) -> + { + InputEntityDeserializer chunkDeserializer = factory.create( seeker, header, data.decorator() ); + chunkDeserializer.initialize(); + List entities = new ArrayList<>(); + while ( chunkDeserializer.hasNext() ) + { + ENTITY next = chunkDeserializer.next(); + entities.add( next ); + } + return entities.toArray( (ENTITY[]) Array.newInstance( entityClass, entities.size() ) ); + }, + () -> dataHeader.clone() /*We need to clone the stateful header to each processing thread*/ ); + + // Utility cursor which takes care of moving over processed results from chunk to chunk + cursor = new ContinuableArrayCursor<>( rebaseBatches( processing ) ); + + // Start an asynchronous slurp of the chunks fed directly into the processors + processing.slurp( seekers( firstSeeker, source, config ), true ); + } + catch ( IOException e ) + { + throw new InputException( "Couldn't read first chunk from input", e ); + } + } + + @Override + protected ENTITY fetchNextOrNull() + { + boolean hasNext; + try + { + hasNext = cursor.next(); + } + catch ( TaskExecutionPanicException e ) + { + // Getting this exception here means that a processor got an exception and put + // the executor in panic mode. The user would like to see the actual exception + // so we're going to do a little thing here where we take the cause of this + // IllegalStateException and throw it, since this ISE is just a wrapper. + throw Exceptions.launderedException( e.getCause() ); + } + + if ( hasNext ) + { + ENTITY next = cursor.get(); + // We keep a reference to the last fetched so that the methods from SourceTraceability can + // be implemented and executed correctly. + last = next; + return next; + } + return null; + } + + private static Supplier rebaseBatches( + TicketedProcessing processing ) + { + return () -> processing.next(); + } + + private static Iterator seekers( CharSeeker firstSeeker, ProcessingSource source, Configuration config ) + { + return new PrefetchingIterator() + { + private boolean firstReturned; + + @Override + protected CharSeeker fetchNextOrNull() + { + // We have the first here explicitly since we read it before starting the general processing + // and extract the header. We want to read the data in it as well and that's why we get it here + if ( !firstReturned ) + { + firstReturned = true; + return firstSeeker; + } + + // Continue read the next chunk from the source file(s) + try + { + Chunk chunk = source.nextChunk(); + return chunk.length() > 0 ? new BufferedCharSeeker( singleChunk( chunk ), config ) : null; + } + catch ( IOException e ) + { + throw new InputException( "Couldn't get chunk from source", e ); + } + } + }; + } + + @Override + public void close() + { + processing.shutdown( true ); + try + { + source.close(); + } + catch ( IOException e ) + { + throw new InputException( "Couldn't close source of data chunks", e ); + } + finally + { + super.close(); + } + } + + @Override + public int processors( int delta ) + { + return processing.processors( delta ); + } + + @Override + public String sourceDescription() + { + return last.sourceDescription(); + } + + @Override + public long lineNumber() + { + return last.lineNumber(); + } + + @Override + public long position() + { + return last.position(); + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputBatchImportIT.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputBatchImportIT.java index af629a88f292c..903d3559e21b7 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputBatchImportIT.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputBatchImportIT.java @@ -66,6 +66,7 @@ import org.neo4j.unsafe.impl.batchimport.input.InputNode; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; +import static java.lang.Runtime.getRuntime; import static java.lang.String.format; import static java.lang.System.currentTimeMillis; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -97,7 +98,8 @@ public void shouldImportDataComingFromCsvFiles() throws Exception try { importer.doImport( csv( nodeDataAsFile( nodeData ), relationshipDataAsFile( relationshipData ), - IdType.STRING, lowBufferSize( COMMAS ), silentBadCollector( 0 ) ) ); + IdType.STRING, lowBufferSize( COMMAS ), silentBadCollector( 0 ), + getRuntime().availableProcessors() ) ); // THEN verifyImportedData( nodeData, relationshipData ); success = true; diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputTest.java index 9ddacfc5a310b..7253cefb0e2cd 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/CsvInputTest.java @@ -27,6 +27,7 @@ import org.junit.Rule; import org.junit.Test; +import org.neo4j.csv.reader.CharReadable; import org.neo4j.csv.reader.CharSeeker; import org.neo4j.csv.reader.CharSeekers; import org.neo4j.csv.reader.Extractor; @@ -59,6 +60,9 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; + +import static java.lang.Runtime.getRuntime; + import static org.neo4j.csv.reader.Readables.wrap; import static org.neo4j.helpers.ArrayUtil.union; import static org.neo4j.helpers.collection.Iterators.asSet; @@ -85,7 +89,8 @@ public void shouldProvideNodesFromCsvInput() throws Exception header( entry( null, Type.ID, idType.extractor( extractors ) ), entry( "name", Type.PROPERTY, extractors.string() ), entry( "labels", Type.LABEL, extractors.string() ) ), - null, null, idType, COMMAS, silentBadCollector( 0 ) ); + null, null, idType, COMMAS, silentBadCollector( 0 ), + getRuntime().availableProcessors() ); // WHEN/THEN Iterator nodes = input.nodes().iterator(); @@ -107,7 +112,8 @@ public void shouldProvideRelationshipsFromCsvInput() throws Exception entry( "to", Type.END_ID, idType.extractor( extractors ) ), entry( "type", Type.TYPE, extractors.string() ), entry( "since", Type.PROPERTY, extractors.long_() ) ), idType, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), + getRuntime().availableProcessors() ); // WHEN/THEN Iterator relationships = input.relationships().iterator(); @@ -119,8 +125,8 @@ public void shouldProvideRelationshipsFromCsvInput() throws Exception public void shouldCloseDataIteratorsInTheEnd() throws Exception { // GIVEN - CharSeeker nodeData = spy( charSeeker( "1" ) ); - CharSeeker relationshipData = spy( charSeeker( "1,1" ) ); + CharReadable nodeData = spy( charReader( "1" ) ); + CharReadable relationshipData = spy( charReader( "1,1" ) ); IdType idType = IdType.STRING; Iterable> nodeDataIterable = dataIterable( given( nodeData ) ); Iterable> relationshipDataIterable = @@ -131,7 +137,7 @@ nodeDataIterable, header( relationshipDataIterable, header( entry( null, Type.START_ID, idType.extractor( extractors ) ), entry( null, Type.END_ID, idType.extractor( extractors ) ) ), - idType, COMMAS, silentBadCollector( 0 ) ); + idType, COMMAS, silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator iterator = input.nodes().iterator() ) @@ -162,7 +168,8 @@ public void shouldCopeWithLinesThatHasTooFewValuesButStillValidates() throws Exc entry( "unit", Type.PROPERTY, extractors.string() ), entry( "type", Type.LABEL, extractors.string() ), entry( "kills", Type.PROPERTY, extractors.int_() ) ), - null, null, IdType.ACTUAL, Configuration.COMMAS, silentBadCollector( 0 ) ); + null, null, IdType.ACTUAL, Configuration.COMMAS, silentBadCollector( 0 ), + getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator nodes = input.nodes().iterator() ) @@ -186,7 +193,8 @@ public void shouldIgnoreValuesAfterHeaderEntries() throws Exception header( entry( null, Type.ID, extractors.long_() ), entry( "name", Type.PROPERTY, extractors.string() ) ), - null, null, IdType.ACTUAL, Configuration.COMMAS, silentBadCollector( 4 ) ); + null, null, IdType.ACTUAL, Configuration.COMMAS, silentBadCollector( 4 ), + getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator nodes = input.nodes().iterator() ) @@ -211,7 +219,8 @@ public void shouldHandleMultipleInputGroups() throws Exception Iterable> data = dataIterable( group1, group2 ); Input input = new CsvInput( data, defaultFormatNodeFileHeader(), null, null, - IdType.STRING, Configuration.COMMAS, silentBadCollector( 0 ) ); + IdType.STRING, Configuration.COMMAS, silentBadCollector( 0 ), + getRuntime().availableProcessors() ); // WHEN iterating over them, THEN the expected data should come out ResourceIterator nodes = input.nodes().iterator(); @@ -234,7 +243,8 @@ public void shouldProvideAdditiveLabels() throws Exception additiveLabels( addedLabels ) ); Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( dataIterable, defaultFormatNodeFileHeader(), - null, null, IdType.ACTUAL, Configuration.COMMAS, silentBadCollector( 0 ) ); + null, null, IdType.ACTUAL, Configuration.COMMAS, silentBadCollector( 0 ), + getRuntime().availableProcessors() ); // WHEN/THEN try ( ResourceIterator nodes = input.nodes().iterator() ) @@ -263,7 +273,7 @@ public void shouldProvideDefaultRelationshipType() throws Exception Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( null, null, dataIterable, defaultFormatRelationshipFileHeader(), IdType.ACTUAL, Configuration.COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN/THEN try ( ResourceIterator relationships = input.relationships().iterator() ) @@ -286,14 +296,14 @@ public void shouldFailOnMissingRelationshipType() throws Exception Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( null, null, dataIterable, defaultFormatRelationshipFileHeader(), IdType.ACTUAL, Configuration.COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN/THEN try ( ResourceIterator relationships = input.relationships().iterator() ) { - assertRelationship( relationships.next(), 0L, 1L, type, NO_PROPERTIES ); try { + assertRelationship( relationships.next(), 0L, 1L, type, NO_PROPERTIES ); relationships.next(); fail( "Should have failed" ); } @@ -314,7 +324,7 @@ public void shouldAllowNodesWithoutIdHeader() throws Exception "Johan,2\n" ); Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( dataIterable, defaultFormatNodeFileHeader(), null, null, IdType.STRING, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator nodes = input.nodes().iterator() ) @@ -336,7 +346,7 @@ public void shouldAllowSomeNodesToBeAnonymous() throws Exception ",Johan,2\n" ); // this node is anonymous Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( dataIterable, defaultFormatNodeFileHeader(), null, null, IdType.STRING, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator nodes = input.nodes().iterator() ) @@ -358,7 +368,7 @@ public void shouldAllowNodesToBeAnonymousEvenIfIdHeaderIsNamed() throws Exceptio ",Johan,2\n" ); // this node is anonymous Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( dataIterable, defaultFormatNodeFileHeader(), null, null, IdType.STRING, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator nodes = input.nodes().iterator() ) @@ -380,7 +390,7 @@ public void shouldHaveIdSetAsPropertyIfIdHeaderEntryIsNamed() throws Exception "def,Johan,2\n" ); // this node is anonymous Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( dataIterable, defaultFormatNodeFileHeader(), null, null, IdType.STRING, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator nodes = input.nodes().iterator() ) @@ -402,7 +412,7 @@ public void shouldNotHaveIdSetAsPropertyIfIdHeaderEntryIsNamedForActualIds() thr "1,Johan,2\n" ); // this node is anonymous Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( dataIterable, defaultFormatNodeFileHeader(), null, null, IdType.ACTUAL, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator nodes = input.nodes().iterator() ) @@ -424,7 +434,7 @@ public void shouldIgnoreEmptyPropertyValues() throws Exception "1,Johan,Additional\n" ); Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( dataIterable, defaultFormatNodeFileHeader(), null, null, IdType.ACTUAL, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator nodes = input.nodes().iterator() ) @@ -446,7 +456,7 @@ public void shouldIgnoreEmptyIntPropertyValues() throws Exception "1,Johan,10\n" ); Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( dataIterable, defaultFormatNodeFileHeader(), null, null, IdType.ACTUAL, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator nodes = input.nodes().iterator() ) @@ -465,7 +475,7 @@ public void shouldFailOnArrayDelimiterBeingSameAsDelimiter() throws Exception try { new CsvInput( null, null, null, null, IdType.ACTUAL, customConfig( ',', ',', '"' ), - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); fail( "Should not be possible" ); } catch ( IllegalArgumentException e ) @@ -482,7 +492,7 @@ public void shouldFailOnQuotationCharacterBeingSameAsDelimiter() throws Exceptio try { new CsvInput( null, null, null, null, IdType.ACTUAL, customConfig( ',', ';', ',' ), - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); fail( "Should not be possible" ); } catch ( IllegalArgumentException e ) @@ -500,7 +510,7 @@ public void shouldFailOnQuotationCharacterBeingSameAsArrayDelimiter() throws Exc try { new CsvInput( null, null, null, null, IdType.ACTUAL, customConfig( ',', ';', ';' ), - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); fail( "Should not be possible" ); } catch ( IllegalArgumentException e ) @@ -526,7 +536,7 @@ public void shouldHaveNodesBelongToGroupSpecifiedInHeader() throws Exception header( entry( null, Type.ID, group.name(), idType.extractor( extractors ) ), entry( "name", Type.PROPERTY, extractors.string() ) ), null, null, idType, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN/THEN Iterator nodes = input.nodes().iterator(); @@ -552,7 +562,7 @@ public void shouldHaveRelationshipsSpecifyStartEndNodeIdGroupsInHeader() throws entry( null, Type.TYPE, extractors.string() ), entry( null, Type.END_ID, endNodeGroup.name(), idType.extractor( extractors ) ) ), idType, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN/THEN Iterator relationships = input.relationships().iterator(); @@ -573,7 +583,7 @@ public void shouldDoWithoutRelationshipTypeHeaderIfDefaultSupplied() throws Exce Iterable> dataIterable = dataIterable( data ); Input input = new CsvInput( null, null, dataIterable, defaultFormatRelationshipFileHeader(), IdType.ACTUAL, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( ResourceIterator relationships = input.relationships().iterator() ) @@ -596,14 +606,14 @@ public void shouldIncludeDataSourceInformationOnBadFieldValueOrLine() throws Exc "3,Emil,12" ) ); Input input = new CsvInput( data, DataFactories.defaultFormatNodeFileHeader(), null, null, IdType.INTEGER, Configuration.COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( InputIterator nodes = input.nodes().iterator() ) { - assertNode( nodes.next(), 1L, new Object[] {"name", "Mattias", "other", 10}, labels() ); try { + assertNode( nodes.next(), 1L, new Object[] {"name", "Mattias", "other", 10}, labels() ); nodes.next(); fail( "Should have failed" ); } @@ -626,7 +636,7 @@ public void shouldIgnoreNodeEntriesMarkedIgnoreUsingHeader() throws Exception "2,Johan,111,Person\n" + "3,Emil,12,Person" ) ); Input input = new CsvInput( data, defaultFormatNodeFileHeader(), null, null, IdType.INTEGER, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( InputIterator nodes = input.nodes().iterator() ) @@ -648,7 +658,7 @@ public void shouldIgnoreRelationshipEntriesMarkedIgnoreUsingHeader() throws Exce "2,KNOWS,3,Johan,111\n" + "3,KNOWS,4,Emil,12" ) ); Input input = new CsvInput( null, null, data, defaultFormatRelationshipFileHeader(), IdType.INTEGER, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( InputIterator relationships = input.relationships().iterator() ) @@ -669,7 +679,7 @@ public void shouldPropagateExceptionFromFailingDecorator() throws Exception DataFactories.nodeData( CsvInputTest.data( ":ID,name\n1,Mattias", new FailingNodeDecorator( failure ) ) ); Input input = new CsvInput( data, defaultFormatNodeFileHeader(), null, null, IdType.INTEGER, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( InputIterator nodes = input.nodes().iterator() ) @@ -693,7 +703,7 @@ public void shouldNotIncludeEmptyArraysInEntities() throws Exception "2,a;b,10;20" ) ); Input input = new CsvInput( data, defaultFormatNodeFileHeader(), null, null, IdType.INTEGER, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN/THEN try ( InputIterator nodes = input.nodes().iterator() ) @@ -713,7 +723,7 @@ public void shouldFailOnRelationshipWithMissingStartIdField() throws Exception ":START_ID,:END_ID,:TYPE\n" + ",1," ) ); Input input = new CsvInput( null, null, data, defaultFormatRelationshipFileHeader(), IdType.INTEGER, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( InputIterator relationships = input.relationships().iterator() ) @@ -736,7 +746,7 @@ public void shouldFailOnRelationshipWithMissingEndIdField() throws Exception ":START_ID,:END_ID,:TYPE\n" + "1,," ) ); Input input = new CsvInput( null, null, data, defaultFormatRelationshipFileHeader(), IdType.INTEGER, COMMAS, - silentBadCollector( 0 ) ); + silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( InputIterator relationships = input.relationships().iterator() ) @@ -767,7 +777,7 @@ public boolean emptyQuotedStringsAsNull() } }; Input input = new CsvInput( data, defaultFormatNodeFileHeader(), - null, null, IdType.INTEGER, config, silentBadCollector( 0 ) ); + null, null, IdType.INTEGER, config, silentBadCollector( 0 ), getRuntime().availableProcessors() ); // WHEN try ( InputIterator nodes = input.nodes().iterator() ) @@ -791,7 +801,7 @@ public void shouldIgnoreEmptyExtraColumns() throws Exception // WHEN Collector collector = mock( Collector.class ); Input input = new CsvInput( data, defaultFormatNodeFileHeader(), - null, null, IdType.INTEGER, COMMAS, collector ); + null, null, IdType.INTEGER, COMMAS, collector, getRuntime().availableProcessors() ); // THEN try ( InputIterator nodes = input.nodes().iterator() ) @@ -830,24 +840,24 @@ public char arrayDelimiter() }; } - private DataFactory given( final CharSeeker data ) + private DataFactory given( final CharReadable data ) { return config -> dataItem( data, (Function) value -> value ); } - private DataFactory data( final CharSeeker data, + private DataFactory data( final CharReadable data, final Function decorator ) { return config -> dataItem( data, decorator ); } - private static Data dataItem( final CharSeeker data, + private static Data dataItem( final CharReadable data, final Function decorator ) { return new Data() { @Override - public CharSeeker stream() + public CharReadable stream() { return data; } @@ -925,7 +935,7 @@ private static DataFactory data( final Stri private static DataFactory data( final String data, final Function decorator ) { - return config -> dataItem( charSeeker( data ), decorator ); + return config -> dataItem( charReader( data ), decorator ); } private static final org.neo4j.csv.reader.Configuration SEEKER_CONFIG = @@ -940,7 +950,12 @@ public int bufferSize() private static CharSeeker charSeeker( String data ) { - return CharSeekers.charSeeker( wrap( new StringReader( data ) ), SEEKER_CONFIG, false ); + return CharSeekers.charSeeker( charReader( data ), SEEKER_CONFIG, false ); + } + + private static CharReadable charReader( String data ) + { + return wrap( new StringReader( data ) ); } @SuppressWarnings( { "rawtypes", "unchecked" } ) diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/DataFactoriesTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/DataFactoriesTest.java index 4e49298ab319d..c18da6f4aadac 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/DataFactoriesTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/DataFactoriesTest.java @@ -194,7 +194,7 @@ public void shouldParseHeaderFromFirstLineOfFirstInputFile() throws Exception Extractors extractors = new Extractors( ';' ); // WHEN - CharSeeker seeker = dataFactory.create( TABS ).stream(); + CharSeeker seeker = CharSeekers.charSeeker( dataFactory.create( TABS ).stream(), TABS, false ); Header header = headerFactory.create( seeker, TABS, IdType.ACTUAL ); // THEN diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputGroupsDeserializerTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputGroupsDeserializerTest.java index ab21b8c779b4b..54f00daa38d3e 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputGroupsDeserializerTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputGroupsDeserializerTest.java @@ -24,9 +24,7 @@ import java.io.StringReader; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; - -import org.neo4j.csv.reader.CharSeeker; +import java.util.concurrent.atomic.AtomicReference; import org.neo4j.function.Suppliers; import org.neo4j.unsafe.impl.batchimport.input.InputNode; @@ -50,31 +48,29 @@ public void shouldBeAbleToAskForSourceInformationEvenBetweenTwoSources() throws // GIVEN List> data = asList( data( ":ID\n1" ), data( "2" ) ); final AtomicInteger flips = new AtomicInteger(); - InputGroupsDeserializer deserializer = new InputGroupsDeserializer( - data.iterator(), defaultFormatNodeFileHeader(), lowBufferSize( COMMAS ), INTEGER ) - { - @Override - protected InputEntityDeserializer entityDeserializer( CharSeeker dataStream, Header dataHeader, - Function decorator ) - { - // This is the point where the currentInput field in InputGroupsDeserializer was null - // so ensure that's no longer the case, just by poking those source methods right here and now. - if ( flips.get() == 0 ) - { - assertNotNull( sourceDescription() ); - } - else + final AtomicReference> deserializerTestHack = new AtomicReference<>( null ); + InputGroupsDeserializer deserializer = new InputGroupsDeserializer<>( + data.iterator(), defaultFormatNodeFileHeader(), lowBufferSize( COMMAS ), INTEGER, + Runtime.getRuntime().availableProcessors(), (stream,header,decorator) -> { - assertEquals( "" + flips.get(), sourceDescription() ); - } + // This is the point where the currentInput field in InputGroupsDeserializer was null + // so ensure that's no longer the case, just by poking those source methods right here and now. + if ( flips.get() == 0 ) + { + assertNotNull( deserializerTestHack.get().sourceDescription() ); + } + else + { + assertEquals( "" + flips.get(), deserializerTestHack.get().sourceDescription() ); + } - flips.incrementAndGet(); - @SuppressWarnings( "unchecked" ) - InputEntityDeserializer result = mock( InputEntityDeserializer.class ); - when( result.sourceDescription() ).thenReturn( String.valueOf( flips.get() ) ); - return result; - } - }; + flips.incrementAndGet(); + @SuppressWarnings( "unchecked" ) + InputEntityDeserializer result = mock( InputEntityDeserializer.class ); + when( result.sourceDescription() ).thenReturn( String.valueOf( flips.get() ) ); + return result; + }, InputNode.class ); + deserializerTestHack.set( deserializer ); // WHEN running through the iterator count( deserializer ); @@ -92,6 +88,12 @@ public int bufferSize() { return 100; } + + @Override + public boolean multilineFields() + { + return true; + } }; } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java new file mode 100644 index 0000000000000..89d8d94e281d9 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.input.csv; + +import org.junit.Rule; +import org.junit.Test; + +import java.io.StringReader; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.function.Function; + +import org.neo4j.csv.reader.CharReadable; +import org.neo4j.kernel.impl.util.Validators; +import org.neo4j.test.RandomRule; +import org.neo4j.unsafe.impl.batchimport.input.Collector; +import org.neo4j.unsafe.impl.batchimport.input.Groups; +import org.neo4j.unsafe.impl.batchimport.input.InputNode; +import org.neo4j.unsafe.impl.batchimport.input.csv.InputGroupsDeserializer.DeserializerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import static org.neo4j.csv.reader.Readables.wrap; +import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS; +import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader; +import static org.neo4j.unsafe.impl.batchimport.input.csv.IdType.ACTUAL; + +public class ParallelInputEntityDeserializerTest +{ + @Rule + public final RandomRule random = new RandomRule().withSeed( 1468928804595L ); + + @Test + public void shouldParseDataInParallel() throws Exception + { + // GIVEN + int entities = 500; + Data data = testData( entities ); + Configuration config = new Configuration.Overriden( COMMAS ) + { + @Override + public int bufferSize() + { + return 100; + } + }; + IdType idType = ACTUAL; + Collector badCollector = mock( Collector.class ); + Groups groups = new Groups(); + Set observedProcessingThreads = new CopyOnWriteArraySet<>(); + int threads = 4; + DeserializerFactory deserializerFactory = (chunk,header,decorator) -> + { + observedProcessingThreads.add( Thread.currentThread() ); + // Make sure there will be 4 different processing threads doing this + while ( observedProcessingThreads.size() < threads ); + return new InputEntityDeserializer<>( header, chunk, config.delimiter(), + new InputNodeDeserialization( chunk, header, groups, idType.idsAreExternal() ), decorator, + Validators.emptyValidator(), badCollector ); + }; + try ( ParallelInputEntityDeserializer deserializer = new ParallelInputEntityDeserializer<>( data, + defaultFormatNodeFileHeader(), config, idType, threads, deserializerFactory, InputNode.class ) ) + { + deserializer.processors( threads ); + + // WHEN/THEN + for ( long i = 0; i < entities; i++ ) + { + assertTrue( deserializer.hasNext() ); + InputNode entity = deserializer.next(); + assertEquals( i, ((Long) entity.id()).longValue() ); + assertEquals( "name", entity.properties()[0] ); + assertTrue( entity.properties()[1].toString().startsWith( i + "-" ) ); + } + assertFalse( deserializer.hasNext() ); + assertEquals( threads, observedProcessingThreads.size() ); + } + } + + private Data testData( int entities ) + { + StringBuilder string = new StringBuilder(); + string.append( ":ID,name\n" ); + for ( int i = 0; i < entities; i++ ) + { + string.append( i ).append( "," ).append( i + "-" + random.string() ).append( "\n" ); + } + return data( string.toString() ); + } + + private Data data( String string ) + { + return new Data() + { + @Override + public CharReadable stream() + { + return wrap( new StringReader( string ) ); + } + + @Override + public Function decorator() + { + return item -> item; + } + }; + } +}