Skip to content

Commit

Permalink
[PDI-15246] Group By with include all rows option does not clean up t…
Browse files Browse the repository at this point in the history
…emp files

- Closing streams before deleting file
- Refactoring
- Tests written

More details:

**GroupBy**
- Closing streams before deleting files.
- Logging an error if stream can't be closed
- Logging detailed message if temp file wasn't deleted. I've chosen this log level because in most cases temp file wouldn't be deleted because streams to this file are not closed (and we do log about close-stream error)
- Refactoring

**GroupByData**
- Changed name of streams, pointing that they are related to temp file only.

**GroupByMeta**
- Refactoring some field names

**GroupByTest**
- Updating to JUnit 4.0
- Added positive test for temp file deletion
- No negative test added, as in this case it would be equivalent to testing logger.

**messages**
- Added message, notifying that temp file wasn't deleting
- Refactored a message, notifying that connection to file wasn't closed. Pointing to the path of the file, where an error occurred.
  • Loading branch information
Ivan Nikolaichuk authored and Ivan Nikolaichuk committed May 13, 2016
1 parent 85bf167 commit 9311efc
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 119 deletions.
188 changes: 101 additions & 87 deletions engine/src/org/pentaho/di/trans/steps/groupby/GroupBy.java
Expand Up @@ -115,11 +115,11 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws
data.counts = new long[ meta.getSubjectField().length ];
data.subjectnrs = new int[ meta.getSubjectField().length ];

data.cumulativeSumSourceIndexes = new ArrayList<Integer>();
data.cumulativeSumTargetIndexes = new ArrayList<Integer>();
data.cumulativeSumSourceIndexes = new ArrayList<>();
data.cumulativeSumTargetIndexes = new ArrayList<>();

data.cumulativeAvgSourceIndexes = new ArrayList<Integer>();
data.cumulativeAvgTargetIndexes = new ArrayList<Integer>();
data.cumulativeAvgSourceIndexes = new ArrayList<>();
data.cumulativeAvgTargetIndexes = new ArrayList<>();

for ( int i = 0; i < meta.getSubjectField().length; i++ ) {
if ( meta.getAggregateType()[ i ] == GroupByMeta.TYPE_GROUP_COUNT_ANY ) {
Expand Down Expand Up @@ -420,12 +420,12 @@ private boolean sameGroup( Object[] previous, Object[] r ) throws KettleValueExc
/**
* used for junits in GroupByAggregationNullsTest
*
* @param r
* @param row
* @throws KettleValueException
*/
@SuppressWarnings( "unchecked" ) void calcAggregate( Object[] r ) throws KettleValueException {
@SuppressWarnings( "unchecked" ) void calcAggregate( Object[] row ) throws KettleValueException {
for ( int i = 0; i < data.subjectnrs.length; i++ ) {
Object subj = r[ data.subjectnrs[ i ] ];
Object subj = row[ data.subjectnrs[ i ] ];
ValueMetaInterface subjMeta = data.inputRowMeta.getValueMeta( data.subjectnrs[ i ] );
Object value = data.agg[ i ];
ValueMetaInterface valueMeta = data.aggMeta.getValueMeta( i );
Expand Down Expand Up @@ -673,8 +673,6 @@ private void initGroupMeta( RowMetaInterface previousRowMeta ) throws KettleValu
for ( int i = 0; i < data.groupnrs.length; i++ ) {
data.groupMeta.addValueMeta( previousRowMeta.getValueMeta( data.groupnrs[ i ] ) );
}

return;
}

/**
Expand All @@ -684,67 +682,71 @@ private void initGroupMeta( RowMetaInterface previousRowMeta ) throws KettleValu
* @throws KettleValueException
*/
Object[] getAggregateResult() throws KettleValueException {

if ( data.subjectnrs == null ) {
return new Object[ 0 ];
}

Object[] result = new Object[ data.subjectnrs.length ];

if ( data.subjectnrs != null ) {
for ( int i = 0; i < data.subjectnrs.length; i++ ) {
Object ag = data.agg[ i ];
switch ( meta.getAggregateType()[ i ] ) {
case GroupByMeta.TYPE_GROUP_SUM:
break;
case GroupByMeta.TYPE_GROUP_AVERAGE:
ag =
ValueDataUtil.divide( data.aggMeta.getValueMeta( i ), ag, new ValueMeta(
"c", ValueMetaInterface.TYPE_INTEGER ), new Long( data.counts[ i ] ) );
break;
case GroupByMeta.TYPE_GROUP_MEDIAN:
case GroupByMeta.TYPE_GROUP_PERCENTILE:
double percentile = 50.0;
if ( meta.getAggregateType()[ i ] == GroupByMeta.TYPE_GROUP_PERCENTILE ) {
percentile = Double.parseDouble( meta.getValueField()[ i ] );
}
@SuppressWarnings( "unchecked" )
List<Double> valuesList = (List<Double>) data.agg[ i ];
double[] values = new double[ valuesList.size() ];
for ( int v = 0; v < values.length; v++ ) {
values[ v ] = valuesList.get( v );
}
ag = new Percentile().evaluate( values, percentile );
break;
case GroupByMeta.TYPE_GROUP_COUNT_ANY:
case GroupByMeta.TYPE_GROUP_COUNT_ALL:
ag = new Long( data.counts[ i ] );
break;
case GroupByMeta.TYPE_GROUP_COUNT_DISTINCT:
break;
case GroupByMeta.TYPE_GROUP_MIN:
break;
case GroupByMeta.TYPE_GROUP_MAX:
break;
case GroupByMeta.TYPE_GROUP_STANDARD_DEVIATION:
if ( ag == null ) {
// PMD-1037 - when all input data is null ag is null, npe on access ag
break;
}
double sum = (Double) ag / data.counts[ i ];
ag = Double.valueOf( Math.sqrt( sum ) );
break;
case GroupByMeta.TYPE_GROUP_CONCAT_COMMA:
case GroupByMeta.TYPE_GROUP_CONCAT_STRING:
ag = ( (StringBuilder) ag ).toString();
break;
default:
for ( int i = 0; i < data.subjectnrs.length; i++ ) {
Object ag = data.agg[ i ];
switch ( meta.getAggregateType()[ i ] ) {
case GroupByMeta.TYPE_GROUP_SUM:
break;
case GroupByMeta.TYPE_GROUP_AVERAGE:
ag =
ValueDataUtil.divide( data.aggMeta.getValueMeta( i ), ag, new ValueMeta(
"c", ValueMetaInterface.TYPE_INTEGER ), new Long( data.counts[ i ] ) );
break;
case GroupByMeta.TYPE_GROUP_MEDIAN:
case GroupByMeta.TYPE_GROUP_PERCENTILE:
double percentile = 50.0;
if ( meta.getAggregateType()[ i ] == GroupByMeta.TYPE_GROUP_PERCENTILE ) {
percentile = Double.parseDouble( meta.getValueField()[ i ] );
}
@SuppressWarnings( "unchecked" )
List<Double> valuesList = (List<Double>) data.agg[ i ];
double[] values = new double[ valuesList.size() ];
for ( int v = 0; v < values.length; v++ ) {
values[ v ] = valuesList.get( v );
}
ag = new Percentile().evaluate( values, percentile );
break;
case GroupByMeta.TYPE_GROUP_COUNT_ANY:
case GroupByMeta.TYPE_GROUP_COUNT_ALL:
ag = new Long( data.counts[ i ] );
break;
case GroupByMeta.TYPE_GROUP_COUNT_DISTINCT:
break;
case GroupByMeta.TYPE_GROUP_MIN:
break;
case GroupByMeta.TYPE_GROUP_MAX:
break;
case GroupByMeta.TYPE_GROUP_STANDARD_DEVIATION:
if ( ag == null ) {
// PMD-1037 - when all input data is null ag is null, npe on access ag
break;
}
if ( ag == null && allNullsAreZero ) {
// PDI-10250, 6960 seems all rows for min function was nulls...
// get output subject meta based on original subject meta calculation
ValueMetaInterface vm = data.aggMeta.getValueMeta( i );
ag = ValueDataUtil.getZeroForValueMetaType( vm );
}
result[ i ] = ag;
}
double sum = (Double) ag / data.counts[ i ];
ag = Double.valueOf( Math.sqrt( sum ) );
break;
case GroupByMeta.TYPE_GROUP_CONCAT_COMMA:
case GroupByMeta.TYPE_GROUP_CONCAT_STRING:
ag = ( (StringBuilder) ag ).toString();
break;
default:
break;
}
if ( ag == null && allNullsAreZero ) {
// PDI-10250, 6960 seems all rows for min function was nulls...
// get output subject meta based on original subject meta calculation
ValueMetaInterface vm = data.aggMeta.getValueMeta( i );
ag = ValueDataUtil.getZeroForValueMetaType( vm );
}
result[ i ] = ag;
}

return result;

}
Expand All @@ -757,8 +759,8 @@ private void addToBuffer( Object[] row ) throws KettleFileException {
data.tempFile =
File.createTempFile(
meta.getPrefix(), ".tmp", new File( environmentSubstitute( meta.getDirectory() ) ) );
data.fos = new FileOutputStream( data.tempFile );
data.dos = new DataOutputStream( data.fos );
data.fosToTempFile = new FileOutputStream( data.tempFile );
data.dosToTempFile = new DataOutputStream( data.fosToTempFile );
data.firstRead = true;
} catch ( IOException e ) {
throw new KettleFileException( BaseMessages.getString(
Expand All @@ -767,7 +769,7 @@ private void addToBuffer( Object[] row ) throws KettleFileException {
}
// OK, save the oldest rows to disk!
Object[] oldest = data.bufferList.get( 0 );
data.inputRowMeta.writeData( data.dos, oldest );
data.inputRowMeta.writeData( data.dosToTempFile, oldest );
data.bufferList.remove( 0 );
data.rowsOnFile++;
}
Expand All @@ -778,8 +780,8 @@ private Object[] getRowFromBuffer() throws KettleFileException {
if ( data.firstRead ) {
// Open the inputstream first...
try {
data.fis = new FileInputStream( data.tempFile );
data.dis = new DataInputStream( data.fis );
data.fisToTmpFile = new FileInputStream( data.tempFile );
data.disToTmpFile = new DataInputStream( data.fisToTmpFile );
data.firstRead = false;
} catch ( IOException e ) {
throw new KettleFileException( BaseMessages.getString(
Expand All @@ -790,7 +792,7 @@ private Object[] getRowFromBuffer() throws KettleFileException {
// Read one row from the file!
Object[] row;
try {
row = data.inputRowMeta.readData( data.dis );
row = data.inputRowMeta.readData( data.disToTmpFile );
} catch ( SocketTimeoutException e ) {
throw new KettleFileException( e ); // Shouldn't happen on files
}
Expand All @@ -810,34 +812,34 @@ private Object[] getRowFromBuffer() throws KettleFileException {

private void closeOutput() throws KettleFileException {
try {
if ( data.dos != null ) {
data.dos.close();
data.dos = null;
if ( data.dosToTempFile != null ) {
data.dosToTempFile.close();
data.dosToTempFile = null;
}
if ( data.fos != null ) {
data.fos.close();
data.fos = null;
if ( data.fosToTempFile != null ) {
data.fosToTempFile.close();
data.fosToTempFile = null;
}
data.firstRead = true;
} catch ( IOException e ) {
throw new KettleFileException(
BaseMessages.getString( PKG, "GroupBy.Exception.UnableToCloseInputStream" ), e );
BaseMessages.getString( PKG, "GroupBy.Exception.UnableToCloseInputStream", data.tempFile.getPath() ), e );
}
}

private void closeInput() throws KettleFileException {
try {
if ( data.fis != null ) {
data.fis.close();
data.fis = null;
if ( data.fisToTmpFile != null ) {
data.fisToTmpFile.close();
data.fisToTmpFile = null;
}
if ( data.dis != null ) {
data.dis.close();
data.dis = null;
if ( data.disToTmpFile != null ) {
data.disToTmpFile.close();
data.disToTmpFile = null;
}
} catch ( IOException e ) {
throw new KettleFileException(
BaseMessages.getString( PKG, "GroupBy.Exception.UnableToCloseInputStream" ), e );
BaseMessages.getString( PKG, "GroupBy.Exception.UnableToCloseInputStream", data.tempFile.getPath() ), e );
}
}

Expand All @@ -846,7 +848,7 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
data = (GroupByData) sdi;

if ( super.init( smi, sdi ) ) {
data.bufferList = new ArrayList<Object[]>();
data.bufferList = new ArrayList<>();

data.rowsOnFile = 0;

Expand All @@ -857,7 +859,19 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {

public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {
if ( data.tempFile != null ) {
data.tempFile.delete();
try {
closeInput();
closeOutput();
} catch ( KettleFileException e ) {
log.logError( e.getLocalizedMessage() );
}

boolean tempFileDeleted = data.tempFile.delete();

if ( !tempFileDeleted && log.isDetailed() ) {
log.logDetailed(
BaseMessages.getString( PKG, "GroupBy.Exception.UnableToDeleteTemporaryFile", data.tempFile.getPath() ) );
}
}

super.dispose( smi, sdi );
Expand Down
Expand Up @@ -66,16 +66,16 @@ public class GroupByData extends BaseStepData implements StepDataInterface {

public File tempFile;

public FileOutputStream fos;
public FileOutputStream fosToTempFile;

public DataOutputStream dos;
public DataOutputStream dosToTempFile;

public int rowsOnFile;

public boolean firstRead;

public FileInputStream fis;
public DataInputStream dis;
public FileInputStream fisToTmpFile;
public DataInputStream disToTmpFile;

public Object[] groupResult;

Expand Down

0 comments on commit 9311efc

Please sign in to comment.