Skip to content

Commit

Permalink
issue #175 : refactor record reading code
Browse files Browse the repository at this point in the history
  • Loading branch information
fmbenhassine committed Jun 17, 2016
1 parent 38ac4a9 commit 351b85a
Showing 1 changed file with 33 additions and 27 deletions.
60 changes: 33 additions & 27 deletions easybatch-core/src/main/java/org/easybatch/core/job/JobImpl.java
Expand Up @@ -32,7 +32,6 @@
import org.easybatch.core.reader.RecordReader;
import org.easybatch.core.reader.RecordReadingException;
import org.easybatch.core.record.Record;
import org.easybatch.core.util.Utils;

import java.util.ArrayList;
import java.util.logging.Level;
Expand Down Expand Up @@ -105,33 +104,11 @@ public JobReport call() {
}

//read next record
// TODO clean record reading code
Record currentRecord = null;
int readAttempts = 0;
int maxAttempts = parameters.getRetryPolicy().getMaxAttempts();
long backOffDelay = parameters.getRetryPolicy().getBackOffDelay();
while(readAttempts < maxAttempts) {
try {
readAttempts++;
eventManager.fireBeforeRecordReading();
currentRecord = recordReader.readNextRecord();
eventManager.fireAfterRecordReading(currentRecord);
recordCount++;
readAttempts = maxAttempts;
} catch (RecordReadingException e) {
eventManager.fireOnRecordReadingException(e);
LOGGER.log(Level.SEVERE, "Unable to read next record", e);
report.getMetrics().setLastError(e);
if (readAttempts >= maxAttempts) {
LOGGER.log(Level.WARNING, "Unable to read next record after {0} attempt(s), aborting job", maxAttempts);
report.setStatus(JobStatus.ABORTED);
report.getMetrics().setEndTime(System.currentTimeMillis());
return report;
}
Thread.sleep(backOffDelay);
LOGGER.log(Level.INFO, "Waiting for {0}s before retrying to read next record", toSeconds(backOffDelay));
}
Record currentRecord = readNextRecord();
if (currentRecord == null) {
return report;
}
recordCount++;

//Skip records if any
if (shouldSkipRecord(recordCount)) {
Expand Down Expand Up @@ -165,6 +142,35 @@ public JobReport call() {

}

private Record readNextRecord() throws InterruptedException {
Record currentRecord;
int readAttempts = 0;
int maxAttempts = parameters.getRetryPolicy().getMaxAttempts();
long backOffDelay = parameters.getRetryPolicy().getBackOffDelay();
while(readAttempts < maxAttempts) {
try {
readAttempts++;
eventManager.fireBeforeRecordReading();
currentRecord = recordReader.readNextRecord();
eventManager.fireAfterRecordReading(currentRecord);
return currentRecord;
} catch (RecordReadingException e) {
eventManager.fireOnRecordReadingException(e);
LOGGER.log(Level.SEVERE, "Unable to read next record", e);
report.getMetrics().setLastError(e);
if (readAttempts >= maxAttempts) {
LOGGER.log(Level.WARNING, "Unable to read next record after {0} attempt(s), aborting job", maxAttempts);
report.setStatus(JobStatus.ABORTED);
report.getMetrics().setEndTime(System.currentTimeMillis());
return null;
}
Thread.sleep(backOffDelay);
LOGGER.log(Level.INFO, "Waiting for {0}s before retrying to read next record", toSeconds(backOffDelay));
}
}
return null;
}

private boolean shouldSkipRecord(long recordCount) {
return recordCount <= parameters.getSkip();
}
Expand Down

0 comments on commit 351b85a

Please sign in to comment.