Skip to content

Commit

Permalink
[PDI-15650] Text file output: No header is created if header & append…
Browse files Browse the repository at this point in the history
… are checked

-added fixing for case when filename in stream
  • Loading branch information
AliaksandrShuhayeu committed Feb 8, 2017
1 parent e92a5d3 commit 65145b0
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 19 deletions.
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -69,6 +69,8 @@ public class TextFileOutput extends BaseStep implements StepInterface {


private static final String FILE_COMPRESSION_TYPE_NONE = private static final String FILE_COMPRESSION_TYPE_NONE =
TextFileOutputMeta.fileCompressionTypeCodes[TextFileOutputMeta.FILE_COMPRESSION_TYPE_NONE]; TextFileOutputMeta.fileCompressionTypeCodes[TextFileOutputMeta.FILE_COMPRESSION_TYPE_NONE];
private static final boolean COMPATIBILITY_APPEND_NO_HEADER = "Y".equals(
Const.NVL( System.getProperty( Const.KETTLE_COMPATIBILITY_TEXT_FILE_OUTPUT_APPEND_NO_HEADER ), "N" ) );


public TextFileOutputMeta meta; public TextFileOutputMeta meta;


Expand All @@ -92,12 +94,7 @@ public synchronized boolean processRow( StepMetaInterface smi, StepDataInterface


boolean result = true; boolean result = true;
boolean bEndedLineWrote = false; boolean bEndedLineWrote = false;
boolean newFile; boolean fileExist;
try {
newFile = !getFileObject( buildFilename( environmentSubstitute( meta.getFileName() ), true ), getTransMeta() ).exists();
} catch ( FileSystemException e ) {
throw new KettleException( e );
}
Object[] r = getRow(); // This also waits for a row to be finished. Object[] r = getRow(); // This also waits for a row to be finished.


if ( r != null && first ) { if ( r != null && first ) {
Expand All @@ -122,16 +119,20 @@ public synchronized boolean processRow( StepMetaInterface smi, StepDataInterface


data.fileNameMeta = getInputRowMeta().getValueMeta( data.fileNameFieldIndex ); data.fileNameMeta = getInputRowMeta().getValueMeta( data.fileNameFieldIndex );
data.fileName = data.fileNameMeta.getString( r[data.fileNameFieldIndex] ); data.fileName = data.fileNameMeta.getString( r[data.fileNameFieldIndex] );
setDataWriterForFilename( data.fileName ); fileExist = isFileExist( data.fileName );
setDataWriterForFilename( data.fileName, fileExist );
} else if ( meta.isDoNotOpenNewFileInit() && !meta.isFileNameInField() ) { } else if ( meta.isDoNotOpenNewFileInit() && !meta.isFileNameInField() ) {
fileExist = isFileExist( meta.getFileName() );
// Open a new file here // Open a new file here
// //
openNewFile( meta.getFileName() ); openNewFile( meta.getFileName() );
data.oneFileOpened = true; data.oneFileOpened = true;
initBinaryDataFields(); initBinaryDataFields();
} else {
fileExist = isFileExist( meta.getFileName() );
} }


if ( isNeedWriteHeader( newFile ) ) { if ( isNeedWriteHeader( fileExist ) ) {
writeHeader(); writeHeader();
} }


Expand Down Expand Up @@ -194,7 +195,7 @@ public synchronized boolean processRow( StepMetaInterface smi, StepDataInterface
// //
if ( meta.isFileNameInField() ) { if ( meta.isFileNameInField() ) {
String baseFilename = data.fileNameMeta.getString( r[data.fileNameFieldIndex] ); String baseFilename = data.fileNameMeta.getString( r[data.fileNameFieldIndex] );
setDataWriterForFilename( baseFilename ); setDataWriterForFilename( baseFilename, isFileExist( baseFilename ) );
} }
writeRowToFile( data.outputRowMeta, r ); writeRowToFile( data.outputRowMeta, r );
putRow( data.outputRowMeta, r ); // in case we want it to go further... putRow( data.outputRowMeta, r ); // in case we want it to go further...
Expand All @@ -206,19 +207,30 @@ public synchronized boolean processRow( StepMetaInterface smi, StepDataInterface
return result; return result;
} }


private boolean isNeedWriteHeader( boolean newFile ) { boolean isFileExist( String fileName ) throws KettleException {
boolean fileExist;
try {
fileExist = getFileObject( buildFilename( environmentSubstitute( fileName ), true ), getTransMeta() ).exists();
} catch ( FileSystemException e ) {
throw new KettleException( e );
}
return fileExist;
}

private boolean isNeedWriteHeader( boolean fileExist ) {
if ( meta.isFileNameInField() ) {
return false;
}
if ( !meta.isFileAppended() && ( meta.isHeaderEnabled() || meta.isFooterEnabled() ) ) { // See if we have to write a header-line) if ( !meta.isFileAppended() && ( meta.isHeaderEnabled() || meta.isFooterEnabled() ) ) { // See if we have to write a header-line)
if ( !meta.isFileNameInField() && meta.isHeaderEnabled() && data.outputRowMeta != null ) { if ( meta.isHeaderEnabled() && data.outputRowMeta != null ) {
return true; return true;
} }
} }


//PDI-15650 //PDI-15650
//File Exists=N Flag Set=N Add Header=Y Append=Y //File Exists=N Flag Set=N Add Header=Y Append=Y
//Result = File is created, header is written at top of file (this changed by the fix) //Result = File is created, header is written at top of file (this changed by the fix)
boolean compatibilityAppendNoHeader = "Y".equals( return meta.isHeaderEnabled() && !fileExist && meta.isFileAppended() && !COMPATIBILITY_APPEND_NO_HEADER;
Const.NVL( System.getProperty( Const.KETTLE_COMPATIBILITY_TEXT_FILE_OUTPUT_APPEND_NO_HEADER ), "N" ) );
return meta.isHeaderEnabled() && newFile && meta.isFileAppended() && !compatibilityAppendNoHeader;


} }


Expand All @@ -229,7 +241,7 @@ private boolean isNeedWriteHeader( boolean newFile ) {
* the filename to set the data.writer field for * the filename to set the data.writer field for
* @throws KettleException * @throws KettleException
*/ */
private void setDataWriterForFilename( String filename ) throws KettleException { private void setDataWriterForFilename( String filename, boolean fileExist ) throws KettleException {
// First handle the writers themselves. // First handle the writers themselves.
// If we didn't have a writer yet, we create one. // If we didn't have a writer yet, we create one.
// Basically we open a new file // Basically we open a new file
Expand All @@ -240,9 +252,12 @@ private void setDataWriterForFilename( String filename ) throws KettleException
data.oneFileOpened = true; data.oneFileOpened = true;
data.fileWriterMap.put( filename, data.writer ); data.fileWriterMap.put( filename, data.writer );


boolean isNeedWriteHeader = ( !meta.isFileAppended() && meta.isHeaderEnabled() )
|| ( meta.isHeaderEnabled() && !fileExist && meta.isFileAppended()
&& !COMPATIBILITY_APPEND_NO_HEADER );
// If it's the first time we open it and we have a header, we write a header... // If it's the first time we open it and we have a header, we write a header...
// //
if ( !meta.isFileAppended() && meta.isHeaderEnabled() ) { if ( isNeedWriteHeader ) {
if ( writeHeader() ) { if ( writeHeader() ) {
incrementLinesOutput(); incrementLinesOutput();
} }
Expand Down Expand Up @@ -1020,3 +1035,4 @@ protected OutputStream getOutputStream( String vfsFilename, VariableSpace space,
} }


} }

Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -493,8 +493,10 @@ public void testProcessRule_2() throws KettleException {
Mockito.when( stepMockHelper.processRowsStepMetaInterface.isFileAppended() ).thenReturn( true ); Mockito.when( stepMockHelper.processRowsStepMetaInterface.isFileAppended() ).thenReturn( true );
Mockito.when( stepMockHelper.processRowsStepMetaInterface.isHeaderEnabled() ).thenReturn( true ); Mockito.when( stepMockHelper.processRowsStepMetaInterface.isHeaderEnabled() ).thenReturn( true );
Mockito.when( stepMockHelper.processRowsStepMetaInterface.getOutputFields() ).thenReturn( textFileFields ); Mockito.when( stepMockHelper.processRowsStepMetaInterface.getOutputFields() ).thenReturn( textFileFields );
Mockito.when( stepMockHelper.processRowsStepMetaInterface.isDoNotOpenNewFileInit() ).thenReturn( true );

Mockito.when( stepMockHelper.processRowsStepMetaInterface.isFileNameInField() ).thenReturn( false );


Mockito.when( stepMockHelper.processRowsStepMetaInterface.isFileNameInField() ).thenReturn( true );


textFileOutput = textFileOutput =
new TextFileOutputTestHandler( stepMockHelper.stepMeta, stepMockHelper.stepDataInterface, 0, stepMockHelper.transMeta, new TextFileOutputTestHandler( stepMockHelper.stepMeta, stepMockHelper.stepDataInterface, 0, stepMockHelper.transMeta,
Expand All @@ -516,6 +518,7 @@ public void testProcessRule_2() throws KettleException {


TextFileOutput textFileOutputSpy = Mockito.spy( textFileOutput ); TextFileOutput textFileOutputSpy = Mockito.spy( textFileOutput );
Mockito.doNothing().when( textFileOutputSpy ).openNewFile( EMPTY_FILE_NAME ); Mockito.doNothing().when( textFileOutputSpy ).openNewFile( EMPTY_FILE_NAME );
Mockito.doReturn( false ).when( textFileOutputSpy ).isFileExist( TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION );
textFileOutputSpy.init( stepMockHelper.processRowsStepMetaInterface, stepMockHelper.initStepDataInterface ); textFileOutputSpy.init( stepMockHelper.processRowsStepMetaInterface, stepMockHelper.initStepDataInterface );
Mockito.when( stepMockHelper.processRowsStepMetaInterface.buildFilename( TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION, null, Mockito.when( stepMockHelper.processRowsStepMetaInterface.buildFilename( TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION, null,
textFileOutputSpy, 0, null, 0, true, stepMockHelper.processRowsStepMetaInterface ) ). textFileOutputSpy, 0, null, 0, true, stepMockHelper.processRowsStepMetaInterface ) ).
Expand All @@ -526,4 +529,61 @@ public void testProcessRule_2() throws KettleException {


} }



/**
* PDI-15650
* File Exists=N Flag Set=N Add Header=Y Append=Y
* Result = File is created, header is written at top of file (this changed by the fix)
* with file name in stream
*
* @throws KettleException
*/
@Test
public void testProcessRule_2FileNameInField() throws KettleException {

TextFileField tfFieldMock = Mockito.mock( TextFileField.class );
TextFileField[] textFileFields = { tfFieldMock };

Mockito.when( stepMockHelper.initStepMetaInterface.getEndedLine() ).thenReturn( EMPTY_STRING );
Mockito.when( stepMockHelper.initStepMetaInterface.getOutputFields() ).thenReturn( textFileFields );
Mockito.when( stepMockHelper.initStepMetaInterface.isDoNotOpenNewFileInit() ).thenReturn( true );

Mockito.when( stepMockHelper.processRowsStepMetaInterface.getEndedLine() ).thenReturn( EMPTY_STRING );
Mockito.when( stepMockHelper.processRowsStepMetaInterface.getFileName() ).thenReturn( TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION );
Mockito.when( stepMockHelper.processRowsStepMetaInterface.isFileAppended() ).thenReturn( true );
Mockito.when( stepMockHelper.processRowsStepMetaInterface.isHeaderEnabled() ).thenReturn( true );
Mockito.when( stepMockHelper.processRowsStepMetaInterface.getOutputFields() ).thenReturn( textFileFields );
Mockito.when( stepMockHelper.processRowsStepMetaInterface.isDoNotOpenNewFileInit() ).thenReturn( true );

Mockito.when( stepMockHelper.processRowsStepMetaInterface.isFileNameInField() ).thenReturn( true );


textFileOutput =
new TextFileOutputTestHandler( stepMockHelper.stepMeta, stepMockHelper.stepDataInterface, 0, stepMockHelper.transMeta,
stepMockHelper.trans );
( (TextFileOutputTestHandler) textFileOutput ).setRow( new Object[] {"data text"} );
RowMetaInterface inputRowMeta = Mockito.mock( RowMetaInterface.class );

ValueMetaInterface valueMetaInterface = Mockito.mock( ValueMetaInterface.class );
Mockito.when( valueMetaInterface.getString( Mockito.anyObject() ) ).thenReturn( TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION );
Mockito.when( inputRowMeta.getValueMeta( Mockito.anyInt() ) ).thenReturn( valueMetaInterface );
Mockito.when( inputRowMeta.clone() ).thenReturn( inputRowMeta );

textFileOutput.setInputRowMeta( inputRowMeta );

stepMockHelper.initStepDataInterface.fileWriterMap = new HashMap<>();
stepMockHelper.initStepDataInterface.fileName = TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION;

TextFileOutput textFileOutputSpy = Mockito.spy( textFileOutput );
Mockito.doNothing().when( textFileOutputSpy ).openNewFile( EMPTY_FILE_NAME );
Mockito.doReturn( false ).when( textFileOutputSpy ).isFileExist( TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION );
textFileOutputSpy.init( stepMockHelper.processRowsStepMetaInterface, stepMockHelper.initStepDataInterface );
Mockito.when( stepMockHelper.processRowsStepMetaInterface.buildFilename( TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION, null,
textFileOutputSpy, 0, null, 0, true, stepMockHelper.processRowsStepMetaInterface ) ).
thenReturn( TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION );

textFileOutputSpy.processRow( stepMockHelper.processRowsStepMetaInterface, stepMockHelper.initStepDataInterface );
Mockito.verify( textFileOutputSpy, Mockito.times( 1 ) ).writeHeader( );
}

} }

0 comments on commit 65145b0

Please sign in to comment.