Skip to content

Commit

Permalink
[PDI-15162] "Running in parallel", CSV input step misses some records. (
Browse files Browse the repository at this point in the history
#2428)

*  [PDI-15162]  "Running in parallel", CSV input step misses some records.
 - Making a decision whether not to skip a line after checking whether we are actually starting with a new line, that wasn't read by the previous step (see javadocs or description bellow for more details)
 - Tests for parallel work of CSVInputStep written.

More detailed:
We need to skip row only if a line, that we are currently on is read by the previous step partly.
In other words, we DON'T skip a line if we are just beginning to read it from the first symbol.
We have to do some work for this: read last byte from the previous step and make sure that it is a new line byte.
But it's not enough. There could be a situation, where new line is indicated by '\r\n' construction. And if we are
between this construction, we want to skip last '\n', and don't want to include it in our line.

So, we DON'T skip line only if the previous char is new line indicator AND we are not between '\r\n'.

* [CLEANUP] Checkstyle applied
  • Loading branch information
IvanNikolaychuk authored and Bryan Rosander committed Apr 14, 2016
1 parent be58bf2 commit b1cd03f
Show file tree
Hide file tree
Showing 3 changed files with 373 additions and 7 deletions.
54 changes: 50 additions & 4 deletions engine/src/org/pentaho/di/trans/steps/csvinput/CsvInput.java
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -341,9 +341,10 @@ private boolean openNextFile() throws KettleException {
if ( data.bytesToSkipInFirstFile > 0 ) {
data.fc.position( data.bytesToSkipInFirstFile );

// Now, we need to skip the first row, until the first CR that is.
//
readOneRow( true, true );
// evaluate whether there is a need to skip a row
if ( needToSkipRow() ) {
readOneRow( true, true );
}
}
}

Expand Down Expand Up @@ -387,6 +388,50 @@ private boolean openNextFile() throws KettleException {
}
}

/**
* We need to skip row only if a line, that we are currently on is read by the previous step <b>partly</b>.
* In other words, we DON'T skip a line if we are just beginning to read it from the first symbol.
* We have to do some work for this: read last byte from the previous step and make sure that it is a new line byte.
* But it's not enough. There could be a situation, where new line is indicated by '\r\n' construction. And if we are
* <b>between</b> this construction, we want to skip last '\n', and don't want to include it in our line.
*
* So, we DON'T skip line only if the previous char is new line indicator AND we are not between '\r\n'.
*
*/
private boolean needToSkipRow() {
try {
// first we move pointer to the last byte of the previous step
data.fc.position( data.fc.position() - 1 );
// read data, if not yet
data.resizeBufferIfNeeded();

// check whether the last symbol from the previous step is a new line
if ( data.newLineFound() ) {
// don't increase bytes read for this step, as it is actually content of another step
// and we are reading this just for evaluation.
data.moveEndBufferPointer( false );
// now we are at the first char of our thread.
// there is still a situation we want to avoid: when there is a windows style "/r/n", and we are between two
// of this chars. In this case we need to skip a line. Otherwise we don't skip it.
return data.newLineFound();
} else {
// moving to the first char of our line.
data.moveEndBufferPointer( false );
}

} catch ( IOException e ) {
e.printStackTrace();
} finally {
try {
data.fc.position( data.fc.position() + 1 );
} catch ( IOException e ) {
// nothing to do here
}
}

return true;
}

/**
* Read a single row of data from the file...
*
Expand Down Expand Up @@ -657,6 +702,7 @@ private Object[] readOneRow( boolean skipRow, boolean ignoreEnclosures ) throws
}
}


public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (CsvInputMeta) smi;
data = (CsvInputData) sdi;
Expand Down
15 changes: 12 additions & 3 deletions engine/src/org/pentaho/di/trans/steps/csvinput/CsvInputData.java
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -196,9 +196,18 @@ boolean resizeBufferIfNeeded() throws IOException {
* In case we get an error reading from the input file.
*/
boolean moveEndBufferPointer() throws IOException {
endBuffer++;
totalBytesRead++;
return moveEndBufferPointer( true );
}

/**
* This method should be used very carefully. Moving pointer without increasing number of written bytes
* can lead to data corruption.
*/
boolean moveEndBufferPointer( boolean increaseTotalBytes ) throws IOException {
endBuffer++;
if ( increaseTotalBytes ) {
totalBytesRead++;
}
return resizeBufferIfNeeded();
}

Expand Down

0 comments on commit b1cd03f

Please sign in to comment.