Skip to content

Commit

Permalink
fix for nextRecordTruncated corner case
Browse files Browse the repository at this point in the history
  • Loading branch information
jortiz16 committed Aug 17, 2016
1 parent 529f2d0 commit 0c3eb13
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 121 deletions.
254 changes: 133 additions & 121 deletions src/edu/washington/escience/myria/operator/CSVFileScanFragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,33 @@ public CSVFileScanFragment(
@Override
protected TupleBatch fetchNextReady() throws IOException, DbException {
long lineNumberBegin = lineNumber;
boolean truncatedQuotedField = false;
boolean nextRecordTruncated = false;

while ((buffer.numTuples() < TupleBatch.BATCH_SIZE) && !flagAsIncomplete) {
lineNumber++;
if (parser.isClosed()) {
break;
}

if (truncatedQuotedField) {
if (nextRecordTruncated) {
onLastRow = true;
}

if (!onLastRow) {
record = iterator.next();
try {
if (!onLastRow) {
record = iterator.next();
}
} catch (Exception e) {
/*
* FIX ME: If we hit an exception for a malformed row (in case of quotes for example), we mark this as the last
* row
*/
if (e.getMessage() != null
&& e.getMessage().contains("EOF reached before encapsulated token finished")) {
onLastRow = true;
} else {
throw e;
}
}

try {
Expand All @@ -201,144 +214,143 @@ record = iterator.next();
}
} catch (Exception e) {
/*
* FIX ME: If we hit an exception for a malformed row (in case of quotes for example), we read more bytes by
* marking this as the final line
* FIX ME: If we hit an exception for a malformed row (in case of quotes for example), we mark
* nextRecordTruncated as true
*/
if (e.getMessage() != null
&& e.getMessage().contains("EOF reached before encapsulated token finished")) {
truncatedQuotedField = true;
nextRecordTruncated = true;
} else {
throw e;
}
}

if (record != null) {
/*
* Here, if we are on the last row, we make sure to read the entire row until we either hit a new line or until we
* have read the entire file (this is for the case where a single worker might be reading a single large row that
* was split among other workers). If we're at the last row and the last worker is reading, we just mark this
* final line as finished.
*/
if (onLastRow && !finishedReadingLastRow && !isLastWorker) {
long trailingStartByte = partitionEndByteRange + 1;
long trailingEndByte = trailingStartByte + byteOverlap - 1;
long finalBytePositionFound = trailingStartByte;
boolean finalLineFound = false;

/*
* Here, if we are on the last row, we make sure to read the entire row until we either hit a new line or until
* we have read the entire file (this is for the case where a single worker might be reading a single large row
* that was split among other workers). If we're at the last row and the last worker is reading, we just mark
* this final line as finished.
*/
if (onLastRow && !finishedReadingLastRow && !isLastWorker) {
long trailingStartByte = partitionEndByteRange + 1;
long trailingEndByte = trailingStartByte + byteOverlap - 1;
long finalBytePositionFound = trailingStartByte;
boolean finalLineFound = false;

while (!finalLineFound) {
/*
* If we are within the max byte range, then keep checking for a new line. Otherwise, if we've reached the
* end of the file, mark finalLineFound as true.
*/
if (trailingEndByte < maxByteRange) {
InputStream trailingEndInputStream =
source.getInputStream(trailingStartByte, trailingEndByte);
int dataChar = trailingEndInputStream.read();
while (dataChar != -1) {
char currentChar = (char) dataChar;
if (currentChar == '\n' || currentChar == '\r') {
finalLineFound = true;
break;
}
dataChar = trailingEndInputStream.read();
finalBytePositionFound++;
while (!finalLineFound) {
/*
* If we are within the max byte range, then keep checking for a new line. Otherwise, if we've reached the end
* of the file, mark finalLineFound as true.
*/
if (trailingEndByte < maxByteRange) {
InputStream trailingEndInputStream =
source.getInputStream(trailingStartByte, trailingEndByte);
int dataChar = trailingEndInputStream.read();
while (dataChar != -1) {
char currentChar = (char) dataChar;
if (currentChar == '\n' || currentChar == '\r') {
finalLineFound = true;
break;
}
trailingEndInputStream.close();
} else {
finalLineFound = true;
finalBytePositionFound = maxByteRange;
dataChar = trailingEndInputStream.read();
finalBytePositionFound++;
}
trailingEndInputStream.close();
} else {
finalLineFound = true;
finalBytePositionFound = maxByteRange;
}

/*
* If we found the new line, then reset the parser for this line. Otherwise, increase the byte overlap and
* the trailing range.
*/
if (finalLineFound) {
long characterPositionAtBeginningOfRecord = record.getCharacterPosition();
InputStream completePartitionStream =
source.getInputStream(adjustedStartByteRange, finalBytePositionFound);
BufferedReader reader =
new BufferedReader(new InputStreamReader(completePartitionStream));
reader.skip(characterPositionAtBeginningOfRecord);
parser =
new CSVParser(
reader,
CSVFormat.newFormat(delimiter).withQuote(quote).withEscape(escape),
0,
0);
iterator = parser.iterator();
/*
* If we found the new line, then reset the parser for this line. Otherwise, increase the byte overlap and the
* trailing range.
*/
if (finalLineFound) {
long characterPositionAtBeginningOfRecord =
record == null ? 0 : record.getCharacterPosition();
InputStream completePartitionStream =
source.getInputStream(adjustedStartByteRange, finalBytePositionFound);
BufferedReader reader =
new BufferedReader(new InputStreamReader(completePartitionStream));
reader.skip(characterPositionAtBeginningOfRecord);
parser =
new CSVParser(
reader,
CSVFormat.newFormat(delimiter).withQuote(quote).withEscape(escape),
0,
0);
iterator = parser.iterator();
record = iterator.next();
if (nextRecordTruncated) {
record = iterator.next();
if (truncatedQuotedField) {
record = iterator.next();
}
finishedReadingLastRow = true;
} else {
byteOverlap *= 2;
trailingStartByte += byteOverlap;
trailingEndByte += byteOverlap;
}
nextRecordTruncated = true;
finishedReadingLastRow = true;
} else {
byteOverlap *= 2;
trailingStartByte += byteOverlap;
trailingEndByte += byteOverlap;
}
} else if (record.size() == schema.numColumns() && onLastRow && isLastWorker) {
finishedReadingLastRow = true;
}
} else if (record.size() == schema.numColumns() && onLastRow && isLastWorker) {
finishedReadingLastRow = true;
}

/*
* If we're on the last row, we check if we've finished reading the row completely.
*/
if (!onLastRow || (onLastRow && finishedReadingLastRow)) {
for (int column = 0; column < schema.numColumns(); ++column) {
String cell = record.get(column);
try {
switch (schema.getColumnType(column)) {
case BOOLEAN_TYPE:
if (Floats.tryParse(cell) != null) {
buffer.putBoolean(column, Floats.tryParse(cell) != 0);
} else if (BooleanUtils.toBoolean(cell)) {
buffer.putBoolean(column, Boolean.parseBoolean(cell));
}
break;
case DOUBLE_TYPE:
buffer.putDouble(column, Double.parseDouble(cell));
break;
case FLOAT_TYPE:
buffer.putFloat(column, Float.parseFloat(cell));
break;
case INT_TYPE:
buffer.putInt(column, Integer.parseInt(cell));
break;
case LONG_TYPE:
buffer.putLong(column, Long.parseLong(cell));
break;
case STRING_TYPE:
buffer.putString(column, cell);
break;
case DATETIME_TYPE:
buffer.putDateTime(column, DateTimeUtils.parse(cell));
break;
}
} catch (final IllegalArgumentException e) {
throw new DbException(
"Error parsing column "
+ column
+ " of row "
+ lineNumber
+ ", expected type: "
+ schema.getColumnType(column)
+ ", scanned value: "
+ cell,
e);
/*
* If we're on the last row, we check if we've finished reading the row completely.
*/
if (!onLastRow || (onLastRow && finishedReadingLastRow)) {
for (int column = 0; column < schema.numColumns(); ++column) {
String cell = record.get(column);
try {
switch (schema.getColumnType(column)) {
case BOOLEAN_TYPE:
if (Floats.tryParse(cell) != null) {
buffer.putBoolean(column, Floats.tryParse(cell) != 0);
} else if (BooleanUtils.toBoolean(cell)) {
buffer.putBoolean(column, Boolean.parseBoolean(cell));
}
break;
case DOUBLE_TYPE:
buffer.putDouble(column, Double.parseDouble(cell));
break;
case FLOAT_TYPE:
buffer.putFloat(column, Float.parseFloat(cell));
break;
case INT_TYPE:
buffer.putInt(column, Integer.parseInt(cell));
break;
case LONG_TYPE:
buffer.putLong(column, Long.parseLong(cell));
break;
case STRING_TYPE:
buffer.putString(column, cell);
break;
case DATETIME_TYPE:
buffer.putDateTime(column, DateTimeUtils.parse(cell));
break;
}
} catch (final IllegalArgumentException e) {
throw new DbException(
"Error parsing column "
+ column
+ " of row "
+ lineNumber
+ ", expected type: "
+ schema.getColumnType(column)
+ ", scanned value: "
+ cell,
e);
}
/*
* Once we finish reading the last row, we close the parser
*/
if (onLastRow) {
parser.close();
}
}
LOGGER.debug("Scanned {} input lines", lineNumber - lineNumberBegin);
/*
* Once we finish reading the last row, we close the parser
*/
if (onLastRow) {
parser.close();
}
}
LOGGER.debug("Scanned {} input lines", lineNumber - lineNumberBegin);
}
return buffer.popAny();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,4 +641,23 @@ public void quoteAtBeginning() throws Exception {
assertEquals(4, server.getDatasetStatus(relationKeyCoordinatorIngest).getNumTuples());
diffHelperMethod(relationKeyParallelIngest, relationKeyCoordinatorIngest, fileSchema);
}

@Test
public void truncatedQuoteFirstWorker() throws Exception {
String fileAddress = "s3://myria-test/quoteFirstWorker.csv";
Schema fileSchema = Schema.ofFields("w", Type.STRING_TYPE);

/* Ingest in parallel */
RelationKey relationKeyParallelIngest = RelationKey.of("public", "adhoc", "quote_firstWorker_parallel");
server.parallelIngestDataset(relationKeyParallelIngest, fileSchema, ',', '"', null, 0, new AmazonS3Source(
fileAddress, null, null), server.getAliveWorkers());
assertEquals(1, server.getDatasetStatus(relationKeyParallelIngest).getNumTuples());

/* Ingest the through the coordinator */
RelationKey relationKeyCoordinatorIngest = RelationKey.of("public", "adhoc", "quote_firstWorker_coordinator");
server.ingestDataset(relationKeyCoordinatorIngest, server.getAliveWorkers(), null, new FileScan(new UriSource(
fileAddress), fileSchema, ',', null, null, 0), new RoundRobinPartitionFunction(workerIDs.length));
assertEquals(1, server.getDatasetStatus(relationKeyCoordinatorIngest).getNumTuples());
diffHelperMethod(relationKeyParallelIngest, relationKeyCoordinatorIngest, fileSchema);
}
}

0 comments on commit 0c3eb13

Please sign in to comment.