Skip to content

Commit

Permalink
[PDI-14172] - Text file input Repeat doesn't work
Browse files Browse the repository at this point in the history
- applying changes from 9e3e72d to new implementation of the step
  • Loading branch information
Andrey Khayrutdinov committed Sep 9, 2015
1 parent 3a281d9 commit 218b3da
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 32 deletions.
Expand Up @@ -339,7 +339,7 @@ public static final Object[] convertLineToRow( LogChannelInterface log, TextFile
String escapeCharacter, FileErrorHandler errorHandler,
BaseFileInputStepMeta.AdditionalOutputFields additionalOutputFields, String shortFilename, String path,
boolean hidden, Date modificationDateTime, String uri, String rooturi, String extension, long size )
throws KettleException {
throws KettleException {
if ( textFileLine == null || textFileLine.line == null ) {
return null;
}
Expand Down Expand Up @@ -383,9 +383,13 @@ public static final Object[] convertLineToRow( LogChannelInterface log, TextFile
int trim_type = fieldnr < nrfields ? f.getTrimType() : ValueMetaInterface.TRIM_TYPE_NONE;

if ( fieldnr < strings.length ) {
String pol = strings[fieldnr];
String pol = strings[ fieldnr ];
try {
value = valueMeta.convertDataFromString( pol, convertMeta, nullif, ifnull, trim_type );
if ( valueMeta.isNull( pol ) ) {
value = null;
} else {
value = valueMeta.convertDataFromString( pol, convertMeta, nullif, ifnull, trim_type );
}
} catch ( Exception e ) {
// OK, give some feedback!
String message =
Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.pentaho.di.trans.step.errorhandling.FileErrorHandler;
import org.pentaho.di.trans.steps.StepMockUtil;
import org.pentaho.di.trans.steps.fileinput.BaseFileInputField;
import org.pentaho.di.utils.TestUtils;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -117,30 +118,23 @@ public void test_PDI695() throws KettleFileException, UnsupportedEncodingExcepti

@Test
public void readWrappedInputWithoutHeaders() throws Exception {
final String virtualFile = "ram://pdi-2607.txt";
KettleVFS.getFileObject( virtualFile ).createFile();

final String content =
new StringBuilder().append( "r1c1" ).append( '\n' ).append( ";r1c2\n" ).append( "r2c1" ).append( '\n' ).append(
";r2c2" ).toString();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
bos.write( content.getBytes() );

OutputStream os = KettleVFS.getFileObject( virtualFile ).getContent().getOutputStream();
IOUtils.copy( new ByteArrayInputStream( bos.toByteArray() ), os );
os.close();
final String content = new StringBuilder()
.append( "r1c1" ).append( '\n' ).append( ";r1c2\n" )
.append( "r2c1" ).append( '\n' ).append( ";r2c2" )
.toString();
final String virtualFile = createVirtualFile( "pdi-2607.txt", content );

TextFileInputMeta meta = new TextFileInputMeta();
meta.content.lineWrapped = true ;
meta.content.lineWrapped = true;
meta.content.nrWraps = 1;
meta.inputFiles.inputFields =
new BaseFileInputField[] { new BaseFileInputField( "col1", -1, -1 ), new BaseFileInputField( "col2", -1, -1 ) };
meta.content.fileCompression = "None" ;
meta.content.fileType = "CSV" ;
meta.content.header= false ;
meta.content.nrHeaderLines= -1 ;
meta.content.footer =false ;
meta.content.nrFooterLines = -1 ;
new BaseFileInputField[] { field( "col1" ), field( "col2" ) };
meta.content.fileCompression = "None";
meta.content.fileType = "CSV";
meta.content.header = false;
meta.content.nrHeaderLines = -1;
meta.content.footer = false;
meta.content.nrFooterLines = -1;

TextFileInputData data = new TextFileInputData();
data.files = new FileInputList();
Expand All @@ -153,35 +147,120 @@ public void readWrappedInputWithoutHeaders() throws Exception {
data.dataErrorLineHandler = Mockito.mock( FileErrorHandler.class );
data.fileFormatType = TextFileInputMeta.FILE_FORMAT_UNIX;
data.separator = ";";
data.filterProcessor = new TextFileFilterProcessor( new TextFileFilter[0], new Variables() );
data.filterProcessor = new TextFileFilterProcessor( new TextFileFilter[ 0 ], new Variables() );
data.filePlayList = new FilePlayListAll();


RowSet output = new BlockingRowSet( 5 );
TextFileInput input = StepMockUtil.getStep( TextFileInput.class, TextFileInputMeta.class, "test" );
input.setOutputRowSets( Collections.singletonList( output ) );
while ( input.processRow( meta, data ) ) {
// wait until the step completes executing
}
executeStep( meta, data, output, 2 );

Object[] row1 = output.getRowImmediate();
assertRow( row1, "r1c1", "r1c2" );

Object[] row2 = output.getRowImmediate();
assertRow( row2, "r2c1", "r2c2" );

KettleVFS.getFileObject( virtualFile ).delete();

deleteVfsFile( virtualFile );
}

@Test
public void readInputWithMissedValues() throws Exception {
final String virtualFile = createVirtualFile( "pdi-14172.txt", "1,1,1\n", "2,,2\n" );

TextFileInputMeta meta = new TextFileInputMeta();
BaseFileInputField field2 = field( "col2" );
field2.setRepeated( true );
meta.inputFiles.inputFields =
new BaseFileInputField[] { field( "col1" ), field2, field( "col3" ) };
meta.content.fileCompression = "None";
meta.content.fileType = "CSV";
meta.content.header = false;
meta.content.nrHeaderLines = -1;
meta.content.footer = false;
meta.content.nrFooterLines = -1;

TextFileInputData data = new TextFileInputData();
data.files = new FileInputList();
data.files.addFile( KettleVFS.getFileObject( virtualFile ) );

data.outputRowMeta = new RowMeta();
data.outputRowMeta.addValueMeta( new ValueMetaString( "col1" ) );
data.outputRowMeta.addValueMeta( new ValueMetaString( "col2" ) );
data.outputRowMeta.addValueMeta( new ValueMetaString( "col3" ) );

data.dataErrorLineHandler = Mockito.mock( FileErrorHandler.class );
data.fileFormatType = TextFileInputMeta.FILE_FORMAT_UNIX;
data.separator = ",";
data.filterProcessor = new TextFileFilterProcessor( new TextFileFilter[ 0 ], new Variables() );
data.filePlayList = new FilePlayListAll();


RowSet output = new BlockingRowSet( 5 );
executeStep( meta, data, output, 2 );

Object[] row1 = output.getRowImmediate();
assertRow( row1, "1", "1", "1" );

Object[] row2 = output.getRowImmediate();
assertRow( row2, "2", "1", "2" );


deleteVfsFile( virtualFile );
}

private void executeStep( TextFileInputMeta meta, TextFileInputData data, RowSet output, int expectedRounds )
throws Exception {
TextFileInput input = StepMockUtil.getStep( TextFileInput.class, TextFileInputMeta.class, "test" );
input.setOutputRowSets( Collections.singletonList( output ) );
int i = 0;
while ( input.processRow( meta, data ) && i < expectedRounds ) {
i++;
}

assertEquals( "The amount of executions should be equal to expected", expectedRounds, i );
}

private static String createVirtualFile( String filename, String... rows ) throws Exception {
String virtualFile = TestUtils.createRamFile( filename );

StringBuilder content = new StringBuilder();
if ( rows != null ) {
for ( String row : rows ) {
content.append( row );
}
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
bos.write( content.toString().getBytes() );

OutputStream os = KettleVFS.getFileObject( virtualFile ).getContent().getOutputStream();
try {
IOUtils.copy( new ByteArrayInputStream( bos.toByteArray() ), os );
} finally {
os.close();
}

return virtualFile;
}

private static void deleteVfsFile( String path ) throws Exception {
TestUtils.getFileObject( path ).delete();
}

private static BaseFileInputField field( String name ) {
return new BaseFileInputField( name, -1, -1 );
}

private static void assertRow( Object[] row, Object... values ) {
assertNotNull( row );
assertTrue( String.format( "%d < %d", row.length, values.length ), row.length >= values.length );
int i = 0;
while ( i < values.length ) {
assertEquals( values[i], row[i] );
assertEquals( values[ i ], row[ i ] );
i++;
}
while ( i < row.length ) {
assertNull( row[i] );
assertNull( row[ i ] );
i++;
}
}
Expand Down

0 comments on commit 218b3da

Please sign in to comment.