Skip to content

Commit

Permalink
parallel ingest bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jortiz16 committed Aug 19, 2016
1 parent 2918e59 commit 0c7b6c3
Showing 1 changed file with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class CSVFileScanFragment extends LeafOperator {
private final long partitionEndByteRange;

private long adjustedStartByteRange;
private int byteOffsetFromTruncatedRowAtStart = 0;
private InputStream partitionInputStream;
private CSVRecord record;
private boolean onLastRow;
Expand Down Expand Up @@ -269,7 +270,9 @@ record = iterator.next();
long characterPositionAtBeginningOfRecord =
(record == null) ? 0 : record.getCharacterPosition();
InputStream completePartitionStream =
source.getInputStream(adjustedStartByteRange, finalBytePositionFound);
source.getInputStream(
adjustedStartByteRange + byteOffsetFromTruncatedRowAtStart,
finalBytePositionFound);
BufferedReader reader =
new BufferedReader(new InputStreamReader(completePartitionStream));
reader.skip(characterPositionAtBeginningOfRecord);
Expand Down Expand Up @@ -390,12 +393,13 @@ protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbExc
*/
if (partitionStartByteRange != 0) {
int firstChar = partitionInputStream.read();
int byteOffset = 1;
byteOffsetFromTruncatedRowAtStart = 1;

if (firstChar != '\n' && firstChar != '\r') {
boolean newLineFound = false;
while (!newLineFound) {
int currentChar = partitionInputStream.read();
byteOffset++;
byteOffsetFromTruncatedRowAtStart++;
if (currentChar == '\n' || currentChar == '\r' || currentChar == -1) {
newLineFound = true;
/*
Expand All @@ -406,19 +410,26 @@ protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbExc
flagAsIncomplete = true;
} else if (currentChar == '\r') {
currentChar = partitionInputStream.read();
byteOffsetFromTruncatedRowAtStart++;
if (currentChar != '\n') {
byteOffsetFromTruncatedRowAtStart--;
partitionInputStream =
source.getInputStream(
adjustedStartByteRange + byteOffset, partitionEndByteRange);
adjustedStartByteRange + byteOffsetFromTruncatedRowAtStart,
partitionEndByteRange);
}
}
}
}
} else if (firstChar == '\r') {
int currentChar = partitionInputStream.read();
byteOffsetFromTruncatedRowAtStart++;
if (currentChar != '\n') {
byteOffsetFromTruncatedRowAtStart--;
partitionInputStream =
source.getInputStream(adjustedStartByteRange + byteOffset, partitionEndByteRange);
source.getInputStream(
adjustedStartByteRange + byteOffsetFromTruncatedRowAtStart,
partitionEndByteRange);
}
}
}
Expand All @@ -430,6 +441,23 @@ protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbExc
new BufferedReader(new InputStreamReader(partitionInputStream)),
CSVFormat.newFormat(delimiter).withQuote(quote).withEscape(escape));
iterator = parser.iterator();

try {
if (!iterator.hasNext()) {
flagAsIncomplete = true;
}
} catch (Exception e) {
/*
* FIX ME: If we hit an exception for a malformed row (in case of quotes for example), we mark this as the
* last row
*/
if (e.getMessage() != null && e.getMessage().contains(truncatedQuoteErrorMessage)) {
onLastRow = true;
} else {
throw e;
}
}

for (int i = 0; i < numberOfSkippedLines; i++) {
iterator.next();
}
Expand Down

0 comments on commit 0c7b6c3

Please sign in to comment.