Skip to content

Commit

Permalink
[GOBBLIN-284] Add retry in SalesforceExtractor to handle transient ne…
Browse files Browse the repository at this point in the history
Closes apache#2137 from htran1/salesforce_fetch_fixes
  • Loading branch information
htran1 authored and zxliucmu committed Nov 16, 2017
1 parent 7662b25 commit 9bc1500
Showing 1 changed file with 140 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public class SalesforceExtractor extends RestApiExtractor {
private static final int MAX_RETRY_INTERVAL_SECS = 600;
// avoid using too many bulk API calls by only allowing PK chunking only if max partitions is configured <= this
private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3;
private static final String FETCH_RETRY_LIMIT_KEY = "salesforce.fetchRetryLimit";
private static final int DEFAULT_FETCH_RETRY_LIMIT = 5;

private boolean pullStatus = true;
private String nextUrl;
Expand All @@ -124,10 +126,13 @@ public class SalesforceExtractor extends RestApiExtractor {
private boolean newBulkResultSet = true;
private int bulkRecordCount = 0;
private int prevBulkRecordCount = 0;
private List<String> csvRecord;

private final boolean pkChunking;
private final int pkChunkingSize;
private final SalesforceConnector sfConnector;
private final int fetchRetryLimit;
private final int batchSize;

public SalesforceExtractor(WorkUnitState state) {
super(state);
Expand All @@ -149,6 +154,13 @@ public SalesforceExtractor(WorkUnitState state) {
this.pkChunkingSize =
Math.max(MIN_PK_CHUNKING_SIZE,
Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE)));

// Get batch size from .pull file
int tmpBatchSize = state.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE,
ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE);

this.batchSize = tmpBatchSize == 0 ? ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE : tmpBatchSize;
this.fetchRetryLimit = state.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT);
}

@Override
Expand Down Expand Up @@ -581,7 +593,8 @@ public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String ent

// Get data from input stream
// If bulk load is not finished, get data from the stream
if (!this.isBulkJobFinished()) {
// Skip empty result sets since they will cause the extractor to terminate early
while (!this.isBulkJobFinished() && (rs == null || rs.isEmpty())) {
rs = getBulkData();
}

Expand Down Expand Up @@ -774,6 +787,125 @@ private List<BatchIdAndResultId> getQueryResultIds(String entity, List<Predicate
}
}

/**
* Get a buffered reader wrapping the query result stream for the result with the specified index
* @param index index the {@link #bulkResultIdList}
* @return a {@link BufferedReader}
* @throws AsyncApiException
*/
private BufferedReader getBulkBufferedReader(int index) throws AsyncApiException {
return new BufferedReader(new InputStreamReader(
this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), this.bulkResultIdList.get(index).getBatchId(),
this.bulkResultIdList.get(index).getResultId()), ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
}

/**
* Fetch records into a {@link RecordSetList} up to the configured batch size {@link #batchSize}. This batch is not
* the entire Salesforce result batch. It is an internal batch in the extractor for buffering a subset of the result
* stream that comes from a Salesforce batch for more efficient processing.
* @param rs the record set to fetch into
* @param initialRecordCount Initial record count to use. This should correspond to the number of records already in rs.
* This is used to limit the number of records returned in rs to {@link #batchSize}.
* @throws DataRecordException
* @throws IOException
*/
private void fetchResultBatch(RecordSetList<JsonElement> rs, int initialRecordCount)
throws DataRecordException, IOException {
int recordCount = initialRecordCount;

// Stream the resultset through CSV reader to identify columns in each record
InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);

// Get header if it is first run of a new resultset
if (this.isNewBulkResultSet()) {
this.bulkRecordHeader = reader.nextRecord();
this.bulkResultColumCount = this.bulkRecordHeader.size();
this.setNewBulkResultSet(false);
}

// Get record from CSV reader stream
while ((this.csvRecord = reader.nextRecord()) != null) {
// Convert CSV record to JsonObject
JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, this.csvRecord, this.bulkResultColumCount);
rs.add(jsonObject);
recordCount++;
this.bulkRecordCount++;

// Insert records in record set until it reaches the batch size
if (recordCount >= batchSize) {
log.info("Total number of records processed so far: " + this.bulkRecordCount);
break;
}
}
}

/**
* Reinitialize the state of {@link #bulkBufferedReader} to handle network disconnects
* @throws IOException
* @throws AsyncApiException
*/
private void reinitializeBufferedReader() throws IOException, AsyncApiException {
// close reader and get a new input stream to reconnect to resolve intermittent network errors
this.bulkBufferedReader.close();
this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount - 1);

// if the result set is partially processed then we need to skip over processed records
if (!isNewBulkResultSet()) {
List<String> lastCsvRecord = null;
InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);

// skip header
reader.nextRecord();

int recordsToSkip = this.bulkRecordCount - this.prevBulkRecordCount;
log.info("Skipping {} records on retry: ", recordsToSkip);

for (int i = 0; i < recordsToSkip; i++) {
lastCsvRecord = reader.nextRecord();
}

// make sure the last record processed before the error was the last record skipped so that the next
// unprocessed record is processed in the next call to fetchResultBatch()
if (recordsToSkip > 0) {
if (!this.csvRecord.equals(lastCsvRecord)) {
throw new RuntimeException("Repositioning after reconnecting did not point to the expected record");
}
}
}
}

/**
* Fetch a result batch with retry for network errors
* @param rs the {@link RecordSetList} to fetch into
*/
private void fetchResultBatchWithRetry(RecordSetList<JsonElement> rs)
throws AsyncApiException, DataRecordException, IOException {
boolean success = false;
int retryCount = 0;
int recordCountBeforeFetch = this.bulkRecordCount;

do {
try {
// reinitialize the reader to establish a new connection to handle transient network errors
if (retryCount > 0) {
reinitializeBufferedReader();
}

// on retries there may already be records in rs, so pass the number of records as the initial count
fetchResultBatch(rs, this.bulkRecordCount - recordCountBeforeFetch);
success = true;
} catch (IOException e) {
if (retryCount < this.fetchRetryLimit) {
log.info("Exception while fetching data, retrying: " + e.getMessage(), e);
retryCount++;
} else {
log.error("Exception while fetching data: " + e.getMessage(), e);
throw e;
}
}
} while (!success);
}

/**
* Get data from the bulk api input stream
* @return record set with each record as a JsonObject
Expand All @@ -796,14 +928,12 @@ private RecordSet<JsonElement> getBulkData() throws DataRecordException {
if (this.bulkResultIdCount < this.bulkResultIdList.size()) {
log.info("Stream resultset for resultId:" + this.bulkResultIdList.get(this.bulkResultIdCount));
this.setNewBulkResultSet(true);
this.bulkBufferedReader =
new BufferedReader(
new InputStreamReader(
this.bulkConnection.getQueryResultStream(this.bulkJob.getId(),
this.bulkResultIdList.get(this.bulkResultIdCount).getBatchId(),
this.bulkResultIdList.get(this.bulkResultIdCount).getResultId()),
ConfigurationKeys.DEFAULT_CHARSET_ENCODING));

if (this.bulkBufferedReader != null) {
this.bulkBufferedReader.close();
}

this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount);
this.bulkResultIdCount++;
this.prevBulkRecordCount = bulkRecordCount;
} else {
Expand All @@ -814,41 +944,8 @@ private RecordSet<JsonElement> getBulkData() throws DataRecordException {
}
}

// if Buffer stream has data then process the same

// Get batch size from .pull file
int batchSize = Utils.getAsInt(this.workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE));
if (batchSize == 0) {
batchSize = ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE;
}

// Stream the resultset through CSV reader to identify columns in each record
InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);

// Get header if it is first run of a new resultset
if (this.isNewBulkResultSet()) {
this.bulkRecordHeader = reader.nextRecord();
this.bulkResultColumCount = this.bulkRecordHeader.size();
this.setNewBulkResultSet(false);
}

List<String> csvRecord;
int recordCount = 0;

// Get record from CSV reader stream
while ((csvRecord = reader.nextRecord()) != null) {
// Convert CSV record to JsonObject
JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, csvRecord, this.bulkResultColumCount);
rs.add(jsonObject);
recordCount++;
this.bulkRecordCount++;

// Insert records in record set until it reaches the batch size
if (recordCount >= batchSize) {
log.info("Total number of records processed so far: " + this.bulkRecordCount);
break;
}
}
// fetch a batch of results with retry for network errors
fetchResultBatchWithRetry(rs);

} catch (Exception e) {
throw new DataRecordException("Failed to get records from salesforce; error - " + e.getMessage(), e);
Expand Down

0 comments on commit 9bc1500

Please sign in to comment.