Skip to content

Commit

Permalink
Fixes a MultiReadable issue and avoids System.in in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Dec 4, 2017
1 parent bf3e6db commit ae92664
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 14 deletions.
Expand Up @@ -139,6 +139,14 @@ public int read( char[] into, int offset, int length ) throws IOException
int read = current.read( into, offset + totalRead, length - totalRead );
if ( read == -1 )
{
if ( totalRead > 0 )
{
// Something has been read, but we couldn't fulfill the request with the current source.
// Return what we've read so far so that we don't mix multiple sources into the same read,
// for source traceability reasons.
return totalRead;
}

if ( !goToNextSource() )
{
break;
Expand All @@ -151,13 +159,10 @@ public int read( char[] into, int offset, int length ) throws IOException
requiresNewLine = false;
}
}
else
else if ( read > 0 )
{
totalRead += read;
if ( read > 0 )
{
checkNewLineRequirement( into, offset + totalRead - 1 );
}
checkNewLineRequirement( into, offset + totalRead - 1 );
}
}
return totalRead;
Expand Down
Expand Up @@ -29,7 +29,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import static org.neo4j.csv.reader.Readables.wrap;

public class MultiReadableTest
Expand Down Expand Up @@ -106,6 +105,33 @@ public void shouldTrackAbsolutePosition() throws Exception
assertFalse( buffer.hasAvailable() );
}

@Test
public void shouldNotCrossSourcesInOneRead() throws Exception
{
// given
String source1 = "abcdefghijklm";
String source2 = "nopqrstuvwxyz";
String[][] data = new String[][] { {source1}, {source2} };
CharReadable readable = new MultiReadable( readerIteratorFromStrings( data, '\n' ) );

// when
char[] target = new char[source1.length() + source2.length() / 2];
int read = readable.read( target, 0, target.length );

// then
assertEquals( source1.length() + 1/*added newline-char*/, read );

// and when
target = new char[source2.length()];
read = readable.read( target, 0, target.length );

// then
assertEquals( source2.length(), read );

read = readable.read( target, 0, target.length );
assertEquals( 1/*added newline-char*/, read );
}

private static final Configuration CONFIG = new Configuration.Overridden( Configuration.DEFAULT )
{
@Override
Expand Down
Expand Up @@ -116,8 +116,8 @@ public void doImport() throws IOException
badCollector,
configuration.maxNumberOfProcessors(), !ignoreBadRelationships );

ImportTool.doImport( outsideWorld.errorStream(), outsideWorld.errorStream(), storeDir, logsDir, reportFile, fs,
nodesFiles, relationshipsFiles, false, input, this.databaseConfig, badOutput, configuration );
ImportTool.doImport( outsideWorld.errorStream(), outsideWorld.errorStream(), System.in, storeDir, logsDir,
reportFile, fs, nodesFiles, relationshipsFiles, false, input, this.databaseConfig, badOutput, configuration );
}

private boolean isIgnoringSomething()
Expand Down
Expand Up @@ -20,8 +20,10 @@
package org.neo4j.tooling;

import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -85,6 +87,7 @@
import static org.neo4j.io.ByteUnit.mebiBytes;
import static org.neo4j.io.fs.FileUtils.readTextFile;
import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit;
import static org.neo4j.kernel.impl.store.PropertyType.EMPTY_BYTE_ARRAY;
import static org.neo4j.kernel.impl.util.Converters.withDefault;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.Configuration.BAD_FILE_NAME;
Expand Down Expand Up @@ -413,6 +416,7 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
File badFile = null;
Long maxMemory;
Boolean defaultHighIO;
InputStream in;

boolean success = false;
try ( FileSystemAbstraction fs = new DefaultFileSystemAbstraction() )
Expand Down Expand Up @@ -470,8 +474,9 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(),
idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector,
configuration.maxNumberOfProcessors(), !skipBadRelationships );
in = defaultSettingsSuitableForTests ? new ByteArrayInputStream( EMPTY_BYTE_ARRAY ) : System.in;

doImport( out, err, storeDir, logsDir, badFile, fs, nodesFiles, relationshipsFiles,
doImport( out, err, in, storeDir, logsDir, badFile, fs, nodesFiles, relationshipsFiles,
enableStacktrace, input, dbConfig, badOutput, configuration );

success = true;
Expand Down Expand Up @@ -540,7 +545,7 @@ static Long parseMaxMemory( String maxMemoryString )
return null;
}

public static void doImport( PrintStream out, PrintStream err, File storeDir, File logsDir, File badFile,
public static void doImport( PrintStream out, PrintStream err, InputStream in, File storeDir, File logsDir, File badFile,
FileSystemAbstraction fs, Collection<Option<File[]>> nodesFiles,
Collection<Option<File[]>> relationshipsFiles, boolean enableStacktrace, Input input,
Config dbConfig, OutputStream badOutput,
Expand All @@ -559,7 +564,7 @@ public static void doImport( PrintStream out, PrintStream err, File storeDir, Fi
null, // no external page cache
configuration,
logService,
ExecutionMonitors.defaultVisible(),
ExecutionMonitors.defaultVisible( in ),
EMPTY,
dbConfig,
RecordFormatSelector.selectForConfig( dbConfig, logService.getInternalLogProvider() ) );
Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.unsafe.impl.batchimport.staging;

import java.io.InputStream;

/**
* Common {@link ExecutionMonitor} implementations.
*/
Expand All @@ -30,11 +32,16 @@ private ExecutionMonitors()
}

public static ExecutionMonitor defaultVisible()
{
return defaultVisible( System.in );
}

public static ExecutionMonitor defaultVisible( InputStream in )
{
ProgressRestoringMonitor monitor = new ProgressRestoringMonitor();
return new MultiExecutionMonitor(
new HumanUnderstandableExecutionMonitor( System.out, monitor ),
new OnDemandDetailsExecutionMonitor( System.out, monitor ) );
new OnDemandDetailsExecutionMonitor( System.out, in, monitor ) );
}

private static final ExecutionMonitor INVISIBLE = new ExecutionMonitor()
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
Expand Down Expand Up @@ -72,6 +73,7 @@ interface Monitor

private final List<StageDetails> details = new ArrayList<>();
private final PrintStream out;
private final InputStream in;
private final Map<String,Pair<String,Runnable>> actions = new HashMap<>();
private final CollectingMonitor gcBlockTime = new CollectingMonitor();
private final MeasureDoNothing gcMonitor;
Expand All @@ -80,9 +82,10 @@ interface Monitor
private StageDetails current;
private boolean printDetailsOnDone;

public OnDemandDetailsExecutionMonitor( PrintStream out, Monitor monitor )
public OnDemandDetailsExecutionMonitor( PrintStream out, InputStream in, Monitor monitor )
{
this.out = out;
this.in = in;
this.monitor = monitor;
this.actions.put( "i", Pair.of( "Print more detailed information", this::printDetails ) );
this.actions.put( "c", Pair.of( "Print more detailed information about current stage", this::printDetailsForCurrentStage ) );
Expand Down Expand Up @@ -188,7 +191,7 @@ private void reactToUserInput()
{
try
{
if ( System.in.available() > 0 )
if ( in.available() > 0 )
{
// don't close this read, since we really don't want to close the underlying System.in
BufferedReader reader = new BufferedReader( new InputStreamReader( System.in ) );
Expand Down

0 comments on commit ae92664

Please sign in to comment.