Skip to content

Commit

Permalink
feat: Merge pull request #782 from pelias/add-elasticsearch-retries
Browse files Browse the repository at this point in the history
Add elasticsearch retries
  • Loading branch information
trescube committed Jan 25, 2017
2 parents f034fc5 + 62a2aa8 commit dec06da
Show file tree
Hide file tree
Showing 22 changed files with 1,231 additions and 697 deletions.
2 changes: 1 addition & 1 deletion app.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var legacy = require('./routes/legacy');
legacy.addRoutes(app, peliasConfig.api);

var v1 = require('./routes/v1');
v1.addRoutes(app, peliasConfig.api);
v1.addRoutes(app, peliasConfig);

/** ----------------------- error middleware ----------------------- **/

Expand Down
86 changes: 62 additions & 24 deletions controller/place.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,82 @@
var service = { mget: require('../service/mget') };
var logger = require('pelias-logger').get('api');
'use strict';

function setup( config, backend ){
const _ = require('lodash');
const retry = require('retry');

// allow overriding of dependencies
backend = backend || require('../src/backend');
const mgetService = require('../service/mget');
const logger = require('pelias-logger').get('api');

function controller( req, res, next ){
function requestHasErrors(request) {
return _.get(request, 'errors', []).length > 0;
}

// do not run controller when a request
// validation error has occurred.
if( req.errors && req.errors.length ){
function isRequestTimeout(err) {
return _.get(err, 'status') === 408;
}

function setup( apiConfig, esclient ){
function controller( req, res, next ){
// do not run controller when a request validation error has occurred.
if (requestHasErrors(req)){
return next();
}

var query = req.clean.ids.map( function(id) {
// options for retry
// maxRetries is from the API config with default of 3
// factor of 1 means that each retry attempt will esclient requestTimeout
const operationOptions = {
retries: _.get(apiConfig, 'requestRetries', 3),
factor: 1,
minTimeout: _.get(esclient, 'transport.requestTimeout')
};

// setup a new operation
const operation = retry.operation(operationOptions);

const cmd = req.clean.ids.map( function(id) {
return {
_index: config.indexName,
_index: apiConfig.indexName,
_type: id.layers,
_id: id.id
};
});

logger.debug( '[ES req]', query );
logger.debug( '[ES req]', cmd );

service.mget( backend, query, function( err, docs ) {
console.log('err:' + err);
operation.attempt((currentAttempt) => {
mgetService( esclient, cmd, function( err, docs ) {
// returns true if the operation should be attempted again
// (handles bookkeeping of maxRetries)
// only consider for status 408 (request timeout)
if (isRequestTimeout(err) && operation.retry(err)) {
logger.info(`request timed out on attempt ${currentAttempt}, retrying`);
return;
}

// error handler
if( err ){
req.errors.push( err );
}
// set response data
else {
res.data = docs;
}
logger.debug('[ES response]', docs);
// error handler
if( err ){
if (_.isObject(err) && err.message) {
req.errors.push( err.message );
} else {
req.errors.push( err );
}
}
// set response data
else {
// log that a retry was successful
// most requests succeed on first attempt so this declutters log files
if (currentAttempt > 1) {
logger.info(`succeeded on retry ${currentAttempt-1}`);
}

next();
res.data = docs;
}
logger.debug('[ES response]', docs);

next();
});
});

}

return controller;
Expand Down
115 changes: 79 additions & 36 deletions controller/search.js
Original file line number Diff line number Diff line change
@@ -1,75 +1,118 @@
var _ = require('lodash');
'use strict';

var service = { search: require('../service/search') };
var logger = require('pelias-logger').get('api');
var logging = require( '../helper/logging' );
const _ = require('lodash');

function setup( config, backend, query ){
const searchService = require('../service/search');
const logger = require('pelias-logger').get('api');
const logging = require( '../helper/logging' );
const retry = require('retry');

// allow overriding of dependencies
backend = backend || require('../src/backend');
query = query || require('../query/search');
function requestHasErrors(request) {
return _.get(request, 'errors', []).length > 0;
}

function responseHasData(response) {
return _.get(response, 'data', []).length > 0;
}

function isRequestTimeout(err) {
return _.get(err, 'status') === 408;
}

function setup( apiConfig, esclient, query ){
function controller( req, res, next ){
// do not run controller when a request
// validation error has occurred.
if( req.errors && req.errors.length ){
if (requestHasErrors(req)) {
return next();
}

// do not run controller if there are already results
// this was added during libpostal integration. if the libpostal parse/query
// doesn't return anything then fallback to old search-engine-y behavior
if (res && res.hasOwnProperty('data') && res.data.length > 0) {
if (responseHasData(res)) {
return next();
}

var cleanOutput = _.cloneDeep(req.clean);
let cleanOutput = _.cloneDeep(req.clean);
if (logging.isDNT(req)) {
cleanOutput = logging.removeFields(cleanOutput);
}
// log clean parameters for stats
logger.info('[req]', 'endpoint=' + req.path, cleanOutput);

var renderedQuery = query(req.clean);
const renderedQuery = query(req.clean);

// if there's no query to call ES with, skip the service
if (_.isUndefined(renderedQuery)) {
return next();
}

// backend command
var cmd = {
index: config.indexName,
// options for retry
// maxRetries is from the API config with default of 3
// factor of 1 means that each retry attempt will esclient requestTimeout
const operationOptions = {
retries: _.get(apiConfig, 'requestRetries', 3),
factor: 1,
minTimeout: _.get(esclient, 'transport.requestTimeout')
};

// setup a new operation
const operation = retry.operation(operationOptions);

// elasticsearch command
const cmd = {
index: apiConfig.indexName,
searchType: 'dfs_query_then_fetch',
body: renderedQuery.body
};

logger.debug( '[ES req]', cmd );

// query backend
service.search( backend, cmd, function( err, docs, meta ){
operation.attempt((currentAttempt) => {
// query elasticsearch
searchService( esclient, cmd, function( err, docs, meta ){
// returns true if the operation should be attempted again
// (handles bookkeeping of maxRetries)
// only consider for status 408 (request timeout)
if (isRequestTimeout(err) && operation.retry(err)) {
logger.info(`request timed out on attempt ${currentAttempt}, retrying`);
return;
}

// error handler
if( err ){
if (_.isObject(err) && err.message) {
req.errors.push( err.message );
} else {
req.errors.push( err );
// error handler
if( err ){
if (_.isObject(err) && err.message) {
req.errors.push( err.message );
} else {
req.errors.push( err );
}
}
}
// set response data
else {
res.data = docs;
res.meta = meta || {};
// store the query_type for subsequent middleware
res.meta.query_type = renderedQuery.type;

logger.info(`[controller:search] [queryType:${renderedQuery.type}] [es_result_count:` +
(res.data && res.data.length ? res.data.length : 0));
}
logger.debug('[ES response]', docs);
next();
// set response data
else {
// log that a retry was successful
// most requests succeed on first attempt so this declutters log files
if (currentAttempt > 1) {
logger.info(`succeeded on retry ${currentAttempt-1}`);
}

res.data = docs;
res.meta = meta || {};
// store the query_type for subsequent middleware
res.meta.query_type = renderedQuery.type;

const messageParts = [
'[controller:search]',
`[queryType:${renderedQuery.type}]`,
`[es_result_count:${_.get(res, 'data', []).length}]`
];

logger.info(messageParts.join(' '));
}
logger.debug('[ES response]', docs);
next();
});

});

}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"geojson": "^0.4.0",
"geojson-extent": "^0.3.1",
"geolib": "^2.0.18",
"geopipes-elasticsearch-backend": "^0.2.0",
"iso3166-1": "^0.2.3",
"joi": "^10.1.0",
"lodash": "^4.5.0",
Expand All @@ -59,6 +58,7 @@
"pelias-model": "4.4.0",
"pelias-query": "8.12.0",
"pelias-text-analyzer": "1.7.0",
"retry": "^0.10.1",
"stats-lite": "2.0.3",
"superagent": "^3.2.1",
"through2": "^2.0.3"
Expand Down
Loading

0 comments on commit dec06da

Please sign in to comment.