Skip to content

Commit

Permalink
Merge pull request #888 from rax-maas/es-rest-helper-instrumentation
Browse files Browse the repository at this point in the history
Elastic Search Rest helper Http Post Logic Refactor
  • Loading branch information
zzantozz committed Jul 20, 2022
2 parents 9b9e640 + 0afde7c commit fe5b71f
Showing 1 changed file with 73 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.*;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
Expand All @@ -35,6 +37,10 @@ public class ElasticsearchRestHelper {
private int numberOfElasticsearchEndpoints;
private int MAX_CALL_COUNT = 10;
private int MAX_RESULT_LIMIT = Configuration.getInstance().getIntegerProperty(CoreConfig.MAX_DISCOVERY_RESULT_SIZE);
private static final Header defaultHttpHeaders[] = {
new BasicHeader("Accept", "application/json"),
new BasicHeader("Content-Type", "application/json")
};

/**
* Gets an instance of the ES rest helper for use in main source.
Expand Down Expand Up @@ -92,51 +98,11 @@ private void initializeBaseUrlCollection(String[] endpoints) {
}

public String fetchEvents(String tenantId, Map<String, List<String>> query) throws IOException {
String tempUrl = String.format("%s/%s/%s/_search?routing=%s&size=%d", getNextBaseUrl(),
EventElasticSearchIO.EVENT_INDEX, EventElasticSearchIO.ES_TYPE, tenantId, MAX_RESULT_LIMIT);
String url = String.format("%s/%s/_search?routing=%s&size=%d", EventElasticSearchIO.EVENT_INDEX,
EventElasticSearchIO.ES_TYPE, tenantId, MAX_RESULT_LIMIT);

String queryDslString = getDslString(tenantId, query);
HttpEntity httpEntity = new NStringEntity(queryDslString, ContentType.APPLICATION_JSON);

Queue<String> callQ = new LinkedList<>();
callQ.add(tempUrl);
int callCount = 0;
while(!callQ.isEmpty() && callCount < MAX_CALL_COUNT) {
callCount++;
String url = callQ.remove();
logger.info("Using url [{}]", url);

HttpPost httpPost = new HttpPost(url);
httpPost.setHeaders(getHeaders());

CloseableHttpResponse response = null;

try {
httpPost.setEntity(httpEntity);
response = closeableHttpClient.execute(httpPost);
String str = EntityUtils.toString(response.getEntity());
EntityUtils.consume(response.getEntity());
return str;
} catch (Exception e) {
if(response == null){
logger.error("fetchEvents failed with message: {}", e.getMessage());
url = String.format("%s/%s/%s/_search?routing=%s&size=%d", getNextBaseUrl(),
EventElasticSearchIO.EVENT_INDEX, EventElasticSearchIO.ES_TYPE,
tenantId, MAX_RESULT_LIMIT);
callQ.add(url);
}
else {
logger.error("fetchEvents failed with status code: {} and exception message: {}",
response.getStatusLine().getStatusCode(), e.getMessage());
}
} finally {
if(response != null) {
response.close();
}
}
}

return "";
return executePost("%s/"+url, queryDslString, "fetchEvents").response;
}

private String getDslString(String tenantId, Map<String, List<String>> query) {
Expand Down Expand Up @@ -204,7 +170,7 @@ public String fetchTokenDocuments(String[] indices, String tenantId, String quer
String temp = String.format("%s/_search?routing=%s&size=%d", multiIndexString, tenantId, MAX_RESULT_LIMIT);
String urlFormat = "%s/" + temp;

return fetchDocs(queryDslString, urlFormat);
return executePost(urlFormat, queryDslString, "fetchTokenDocuments").response;
}

public String fetchDocuments(String indexName, String documentType, String tenantId, String queryDslString) throws IOException {
Expand All @@ -213,51 +179,7 @@ public String fetchDocuments(String indexName, String documentType, String tenan
indexName, documentType, tenantId, MAX_RESULT_LIMIT);
String urlFormat = "%s/" + temp;

return fetchDocs(queryDslString, urlFormat);
}

private String fetchDocs(String queryDslString, String urlFormat) throws IOException {
String tempUrl = String.format(urlFormat, getNextBaseUrl());

Queue<String> callQ = new LinkedList<>();
callQ.add(tempUrl);
int callCount = 0;
while(!callQ.isEmpty() && callCount < MAX_CALL_COUNT) {
callCount++;
String url = callQ.remove();

logger.debug("Using url [{}]", url);
HttpPost httpPost = new HttpPost(url);
httpPost.setHeaders(getHeaders());
HttpEntity httpEntity = new NStringEntity(queryDslString, ContentType.APPLICATION_JSON);
httpPost.setEntity(httpEntity);

CloseableHttpResponse response = null;

try {
response = closeableHttpClient.execute(httpPost);
HttpEntity entity = response.getEntity();
String str = EntityUtils.toString(entity);
EntityUtils.consume(entity);
return str;
} catch (Exception e) {
if(response == null){
logger.error("fetchDocs failed with message: {}", e.getMessage());
url = String.format(urlFormat, getNextBaseUrl());
callQ.add(url);
}
else {
logger.error("fetch failed with status code: {} and exception message: {}",
response.getStatusLine().getStatusCode(), e.getMessage());
}
} finally {
if(response != null) {
response.close();
}
}
}

return "";
return executePost(urlFormat, queryDslString, "fetchDocuments").response;
}

private String getQueryDslString(String tenantId, List<String> queries){
Expand Down Expand Up @@ -499,59 +421,10 @@ private String getUnit(Metric metric) {

//If index() fails for whatever reason, it always throws IOException because indexing failed for Elasticsearch.
public void index(final String urlFormat, final String bulkString) throws IOException {
String tempUrl = String.format(urlFormat, getNextBaseUrl());
HttpEntity entity = new NStringEntity(bulkString, ContentType.APPLICATION_JSON);
int statusCode = 0;

/*
Here I am using Queue to keep a round-robin selection of next base url. If current base URL fails for
whatever reason (with response == null), then in catch block, I am enqueueing the next base URL so that
in next iteration call picks up the new URL. If URL works, queue will be empty and loop will break out.
*/
Queue<String> callQ = new LinkedList<>();
callQ.add(tempUrl);
int callCount = 0;
while(!callQ.isEmpty() && callCount < MAX_CALL_COUNT) {
callCount++;
String url = callQ.remove();
logger.debug("Using url [{}]", url);
HttpPost httpPost = new HttpPost(url);
httpPost.setHeaders(getHeaders());
httpPost.setEntity(entity);

CloseableHttpResponse response = null;

try {
logger.debug("ElasticsearchRestHelper.index Thread name in use: [{}]", Thread.currentThread().getName());
response = closeableHttpClient.execute(httpPost);

statusCode = response.getStatusLine().getStatusCode();
String str = EntityUtils.toString(response.getEntity());
EntityUtils.consume(response.getEntity());

if (statusCode != HttpStatus.SC_OK && statusCode != HttpStatus.SC_CREATED) {
logger.error("index method failed with status code: {} and error: {}", statusCode, str);
}
}
catch (Exception e) {
if(response == null){
logger.error("index method failed with message: {}", e.getMessage());
url = String.format(urlFormat, getNextBaseUrl());
callQ.add(url);
}
else {
logger.error("index method failed with status code: {} and exception message: {}",
statusCode, e.getMessage());
}
} finally {
if(response != null) {
response.close();
}
}
ExecuteResponse response = executePost(urlFormat, bulkString, "index");
if(response.statusCode != HttpStatus.SC_OK && response.statusCode != HttpStatus.SC_CREATED) {
throw new IOException("Elasticsearch indexing failed with status code: [" + response.statusCode + "]");
}

if(statusCode != HttpStatus.SC_OK && statusCode != HttpStatus.SC_CREATED)
throw new IOException("Elasticsearch indexing failed with status code: [" + statusCode + "]");
}

private String getTermQueryString(String key, String value){
Expand Down Expand Up @@ -585,19 +458,6 @@ private String getBoolQueryString(String mustValueString, String shouldValueStri
mustValueString, shouldValueString);
}

private Header[] getHeaders(){
Map<String, String> headersMap = new HashMap<>();
headersMap.put("Accept", "application/json");
headersMap.put("Content-Type", "application/json");

Header[] headers = new Header[headersMap.size()];
int i = 0;
for(String key : headersMap.keySet()){
headers[i++] = new BasicHeader(key, headersMap.get(key));
}
return headers;
}

@VisibleForTesting
public int refreshIndex(String indexName) throws IOException {
String url = String.format("http://%s/%s/_refresh", baseUrlArray[0], indexName);
Expand All @@ -615,4 +475,62 @@ public int refreshIndex(String indexName) throws IOException {

return response.getStatusLine().getStatusCode();
}

/*
Here I am using Queue to keep a round-robin selection of next base url. If current base URL fails for
whatever reason (with response == null), then in catch block, I am enqueueing the next base URL so that
in next iteration call picks up the new URL. If URL works, queue will be empty and loop will break out.
*/
private ExecuteResponse executePost(String urlFormat, String queryDslString, String methodName) throws IOException {
String tempUrl = String.format(urlFormat, getNextBaseUrl());
Queue<String> callQ = new LinkedList<>();
callQ.add(tempUrl);
int callCount = 0;
int responseCode = 0;
while (!callQ.isEmpty() && callCount < MAX_CALL_COUNT) {
callCount++;
String url = callQ.remove();

logger.debug("Using url [{}]", url);
HttpPost httpPost = new HttpPost(url);
httpPost.setHeaders(defaultHttpHeaders);
HttpEntity httpEntity = new NStringEntity(queryDslString, ContentType.APPLICATION_JSON);
httpPost.setEntity(httpEntity);
CloseableHttpResponse response = null;

try {
response = closeableHttpClient.execute(httpPost);
responseCode = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
String str = EntityUtils.toString(entity);
EntityUtils.consume(entity);
return new ExecuteResponse(str, responseCode);
} catch (Exception e) {
if (response == null) {
logger.error("{} failed with message: {}", methodName, e.getMessage());
url = String.format(urlFormat, getNextBaseUrl());
callQ.add(url);
} else {
logger.error("{} failed with status code: {} and exception message: {}", methodName,
response.getStatusLine().getStatusCode(), e.getMessage());
}
} finally {
if (response != null) {
response.close();
}
}
}

return new ExecuteResponse("", responseCode);
}

private static class ExecuteResponse {
private String response;
private int statusCode;

public ExecuteResponse(String response, int statusCode) {
this.response = response;
this.statusCode = statusCode;
}
}
}

0 comments on commit fe5b71f

Please sign in to comment.