diff --git a/community/csv/src/main/java/org/neo4j/csv/reader/MultiReadable.java b/community/csv/src/main/java/org/neo4j/csv/reader/MultiReadable.java index e1973f9b3adf5..2309e979eb6b5 100644 --- a/community/csv/src/main/java/org/neo4j/csv/reader/MultiReadable.java +++ b/community/csv/src/main/java/org/neo4j/csv/reader/MultiReadable.java @@ -139,6 +139,14 @@ public int read( char[] into, int offset, int length ) throws IOException int read = current.read( into, offset + totalRead, length - totalRead ); if ( read == -1 ) { + if ( totalRead > 0 ) + { + // Something has been read, but we couldn't fulfill the request with the current source. + // Return what we've read so far so that we don't mix multiple sources into the same read, + // for source traceability reasons. + return totalRead; + } + if ( !goToNextSource() ) { break; @@ -151,13 +159,10 @@ public int read( char[] into, int offset, int length ) throws IOException requiresNewLine = false; } } - else + else if ( read > 0 ) { totalRead += read; - if ( read > 0 ) - { - checkNewLineRequirement( into, offset + totalRead - 1 ); - } + checkNewLineRequirement( into, offset + totalRead - 1 ); } } return totalRead; diff --git a/community/csv/src/test/java/org/neo4j/csv/reader/MultiReadableTest.java b/community/csv/src/test/java/org/neo4j/csv/reader/MultiReadableTest.java index 3fa1557b50e54..141d643350fb7 100644 --- a/community/csv/src/test/java/org/neo4j/csv/reader/MultiReadableTest.java +++ b/community/csv/src/test/java/org/neo4j/csv/reader/MultiReadableTest.java @@ -29,7 +29,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; - import static org.neo4j.csv.reader.Readables.wrap; public class MultiReadableTest @@ -106,6 +105,33 @@ public void shouldTrackAbsolutePosition() throws Exception assertFalse( buffer.hasAvailable() ); } + @Test + public void shouldNotCrossSourcesInOneRead() throws Exception + { + // given + String source1 = "abcdefghijklm"; + String source2 = "nopqrstuvwxyz"; + String[][] data = new String[][] { {source1}, {source2} }; + CharReadable readable = new MultiReadable( readerIteratorFromStrings( data, '\n' ) ); + + // when + char[] target = new char[source1.length() + source2.length() / 2]; + int read = readable.read( target, 0, target.length ); + + // then + assertEquals( source1.length() + 1/*added newline-char*/, read ); + + // and when + target = new char[source2.length()]; + read = readable.read( target, 0, target.length ); + + // then + assertEquals( source2.length(), read ); + + read = readable.read( target, 0, target.length ); + assertEquals( 1/*added newline-char*/, read ); + } + private static final Configuration CONFIG = new Configuration.Overridden( Configuration.DEFAULT ) { @Override diff --git a/community/dbms/src/main/java/org/neo4j/commandline/dbms/CsvImporter.java b/community/dbms/src/main/java/org/neo4j/commandline/dbms/CsvImporter.java index 75974c7d1d9c7..ea75ae8674c87 100644 --- a/community/dbms/src/main/java/org/neo4j/commandline/dbms/CsvImporter.java +++ b/community/dbms/src/main/java/org/neo4j/commandline/dbms/CsvImporter.java @@ -116,8 +116,8 @@ public void doImport() throws IOException badCollector, configuration.maxNumberOfProcessors(), !ignoreBadRelationships ); - ImportTool.doImport( outsideWorld.errorStream(), outsideWorld.errorStream(), storeDir, logsDir, reportFile, fs, - nodesFiles, relationshipsFiles, false, input, this.databaseConfig, badOutput, configuration ); + ImportTool.doImport( outsideWorld.errorStream(), outsideWorld.errorStream(), System.in, storeDir, logsDir, + reportFile, fs, nodesFiles, relationshipsFiles, false, input, this.databaseConfig, badOutput, configuration ); } private boolean isIgnoringSomething() diff --git a/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java b/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java index daef03cc90549..9b8a21260ba0f 100644 --- a/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java +++ b/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java @@ -20,8 +20,10 @@ package org.neo4j.tooling; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; import java.nio.charset.Charset; @@ -85,6 +87,7 @@ import static org.neo4j.io.ByteUnit.mebiBytes; import static org.neo4j.io.fs.FileUtils.readTextFile; import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit; +import static org.neo4j.kernel.impl.store.PropertyType.EMPTY_BYTE_ARRAY; import static org.neo4j.kernel.impl.util.Converters.withDefault; import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY; import static org.neo4j.unsafe.impl.batchimport.Configuration.BAD_FILE_NAME; @@ -413,6 +416,7 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit File badFile = null; Long maxMemory; Boolean defaultHighIO; + InputStream in; boolean success = false; try ( FileSystemAbstraction fs = new DefaultFileSystemAbstraction() ) @@ -470,8 +474,9 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(), idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector, configuration.maxNumberOfProcessors(), !skipBadRelationships ); + in = defaultSettingsSuitableForTests ? new ByteArrayInputStream( EMPTY_BYTE_ARRAY ) : System.in; - doImport( out, err, storeDir, logsDir, badFile, fs, nodesFiles, relationshipsFiles, + doImport( out, err, in, storeDir, logsDir, badFile, fs, nodesFiles, relationshipsFiles, enableStacktrace, input, dbConfig, badOutput, configuration ); success = true; @@ -540,7 +545,7 @@ static Long parseMaxMemory( String maxMemoryString ) return null; } - public static void doImport( PrintStream out, PrintStream err, File storeDir, File logsDir, File badFile, + public static void doImport( PrintStream out, PrintStream err, InputStream in, File storeDir, File logsDir, File badFile, FileSystemAbstraction fs, Collection> nodesFiles, Collection> relationshipsFiles, boolean enableStacktrace, Input input, Config dbConfig, OutputStream badOutput, @@ -559,7 +564,7 @@ public static void doImport( PrintStream out, PrintStream err, File storeDir, Fi null, // no external page cache configuration, logService, - ExecutionMonitors.defaultVisible(), + ExecutionMonitors.defaultVisible( in ), EMPTY, dbConfig, RecordFormatSelector.selectForConfig( dbConfig, logService.getInternalLogProvider() ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitors.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitors.java index 41edc0bc21027..df79114047ecf 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitors.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitors.java @@ -19,6 +19,8 @@ */ package org.neo4j.unsafe.impl.batchimport.staging; +import java.io.InputStream; + /** * Common {@link ExecutionMonitor} implementations. */ @@ -30,11 +32,16 @@ private ExecutionMonitors() } public static ExecutionMonitor defaultVisible() + { + return defaultVisible( System.in ); + } + + public static ExecutionMonitor defaultVisible( InputStream in ) { ProgressRestoringMonitor monitor = new ProgressRestoringMonitor(); return new MultiExecutionMonitor( new HumanUnderstandableExecutionMonitor( System.out, monitor ), - new OnDemandDetailsExecutionMonitor( System.out, monitor ) ); + new OnDemandDetailsExecutionMonitor( System.out, in, monitor ) ); } private static final ExecutionMonitor INVISIBLE = new ExecutionMonitor() diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/OnDemandDetailsExecutionMonitor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/OnDemandDetailsExecutionMonitor.java index 2e4c70b35a451..2e2ce23aefc55 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/OnDemandDetailsExecutionMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/OnDemandDetailsExecutionMonitor.java @@ -21,6 +21,7 @@ import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintStream; import java.util.ArrayList; @@ -72,6 +73,7 @@ interface Monitor private final List details = new ArrayList<>(); private final PrintStream out; + private final InputStream in; private final Map> actions = new HashMap<>(); private final CollectingMonitor gcBlockTime = new CollectingMonitor(); private final MeasureDoNothing gcMonitor; @@ -80,9 +82,10 @@ interface Monitor private StageDetails current; private boolean printDetailsOnDone; - public OnDemandDetailsExecutionMonitor( PrintStream out, Monitor monitor ) + public OnDemandDetailsExecutionMonitor( PrintStream out, InputStream in, Monitor monitor ) { this.out = out; + this.in = in; this.monitor = monitor; this.actions.put( "i", Pair.of( "Print more detailed information", this::printDetails ) ); this.actions.put( "c", Pair.of( "Print more detailed information about current stage", this::printDetailsForCurrentStage ) ); @@ -188,7 +191,7 @@ private void reactToUserInput() { try { - if ( System.in.available() > 0 ) + if ( in.available() > 0 ) { // don't close this read, since we really don't want to close the underlying System.in BufferedReader reader = new BufferedReader( new InputStreamReader( System.in ) );