Skip to content
Permalink
Browse files

Catch any ElasticsearchException, fail the job, and include the logge…

…d query in the response. Allow IndexNotFoundException and skip those indices in future hops.
  • Loading branch information...
davemoore- committed Aug 26, 2019
1 parent 32baed6 commit 3e96b6de8ad86429de76cd89a6b7fbc995d3a531
Showing with 91 additions and 37 deletions.
  1. +91 −37 src/main/java/io/zentity/resolution/Job.java
@@ -1,5 +1,6 @@
package io.zentity.resolution;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -14,16 +15,20 @@
import io.zentity.resolution.input.Term;
import io.zentity.resolution.input.value.StringValue;
import io.zentity.resolution.input.value.Value;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.builder.SearchSourceBuilder;

@@ -41,6 +46,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static io.zentity.common.Patterns.COLON;

public class Job {
@@ -72,15 +78,55 @@
private Map<String, Attribute> attributes = new TreeMap<>();
private NodeClient client;
private Map<String, Set<String>> docIds = new TreeMap<>();
private String error = null;
private List<String> hits = new ArrayList<>();
private int hop = 0;
private Set<String> missingIndices = new TreeSet<>();
private List<String> queries = new ArrayList<>();
private boolean ran = false;

public Job(NodeClient client) {
this.client = client;
}

public static String serializeElasticsearchException(ElasticsearchException e) throws IOException {
String cause = Strings.toString(e.toXContent(jsonBuilder().startObject(), ToXContent.EMPTY_PARAMS).endObject());
return "{\"error\":{\"root_cause\":[" + cause + "],\"type\":\"" + ElasticsearchException.getExceptionName(e) + "\",\"reason\":\"" + e.getMessage() + "\"},\"status\":" + e.status().getStatus() + "}";
}

public static String serializeLoggedQuery(Input input, int _hop, int _query, String indexName, String request, String response, List<String> resolvers, TreeMap<Integer, TreeMap<String, TreeMap>> resolversFilterTreeGrouped, List<String> termResolvers, TreeMap<String, TreeMap> termResolversFilterTree) throws JsonProcessingException {
List<String> filtersLoggedList = new ArrayList<>();
if (!resolvers.isEmpty() && !resolversFilterTreeGrouped.isEmpty()) {
List<String> attributesResolversSummary = new ArrayList<>();
for (String resolverName : resolvers) {
List<String> resolversAttributes = new ArrayList<>();
for (String attributeName : input.model().resolvers().get(resolverName).attributes())
resolversAttributes.add("\"" + attributeName + "\"");
attributesResolversSummary.add("\"" + resolverName + "\":{\"attributes\":[" + String.join(",", resolversAttributes) + "]}");
}
String attributesResolversFilterTreeLogged = Json.ORDERED_MAPPER.writeValueAsString(resolversFilterTreeGrouped);
filtersLoggedList.add("\"attributes\":{\"tree\":" + attributesResolversFilterTreeLogged + ",\"resolvers\":{" + String.join(",", attributesResolversSummary) + "}}");
} else {
filtersLoggedList.add("\"attributes\":null");
}
if (!termResolvers.isEmpty() && !termResolversFilterTree.isEmpty()) {
List<String> termsResolversSummary = new ArrayList<>();
for (String resolverName : termResolvers) {
List<String> resolverAttributes = new ArrayList<>();
for (String attributeName : input.model().resolvers().get(resolverName).attributes())
resolverAttributes.add("\"" + attributeName + "\"");
termsResolversSummary.add("\"" + resolverName + "\":{\"attributes\":[" + String.join(",", resolverAttributes) + "]}");
}
String termResolversFilterTreeLogged = Json.ORDERED_MAPPER.writeValueAsString(termResolversFilterTree);
filtersLoggedList.add("\"terms\":{\"tree\":{\"0\":" + termResolversFilterTreeLogged + "},\"resolvers\":{" + String.join(",", termsResolversSummary) + "}}");
} else {
filtersLoggedList.add("\"terms\":null");
}
String filtersLogged = String.join(",", filtersLoggedList);
String searchLogged = "{\"request\":" + request + ",\"response\":" + response + "}";
return "{\"_hop\":" + _hop + ",\"_query\":" + _query + ",\"_index\":\"" + indexName + "\",\"filters\":{" + filtersLogged + "},\"search\":" + searchLogged + "}";
}

public static String makeScriptFieldsClause(Input input, String indexName) throws ValidationException {
List<String> scriptFieldClauses = new ArrayList<>();

@@ -468,8 +514,10 @@ else if (size == 1)
private void resetState() {
this.attributes = new TreeMap<>(this.input().attributes());
this.docIds = new TreeMap<>();
this.error = null;
this.hits = new ArrayList<>();
this.hop = 0;
this.missingIndices = new TreeSet<>();
this.queries = new ArrayList<>();
this.ran = false;
}
@@ -589,6 +637,10 @@ private void traverse() throws IOException, ValidationException {
// Construct a query for each index that maps to a resolver.
for (String indexName : this.input.model().indices().keySet()) {

// Skip this index if a prior hop determined the index to be missing.
if (this.missingIndices.contains(indexName))
continue;

// Track _ids for this index.
if (!this.docIds.containsKey(indexName))
this.docIds.put(indexName, new TreeSet<>());
@@ -933,55 +985,55 @@ else if (!resolversClause.isEmpty())
query = "{" + String.join(",", topLevelClauses) + "}";

// Submit query to Elasticsearch.
SearchResponse response = this.search(indexName, query);
SearchResponse response = null;
ElasticsearchException responseError = null;
boolean fatalError = false;
try {
response = this.search(indexName, query);
} catch (IndexNotFoundException e) {
// Don't fail the job if an index was missing.
this.missingIndices.add(e.getIndex().getName());
responseError = e;
} catch (ElasticsearchException e) {
// Fail the job for any other error.
fatalError = true;
responseError = e;
}

// Read response from Elasticsearch.
String responseBody = response.toString();
JsonNode responseData = Json.ORDERED_MAPPER.readTree(responseBody);
JsonNode responseData = null;
if (response != null)
responseData = Json.ORDERED_MAPPER.readTree(response.toString());

// Log queries.
if (this.includeQueries || this.profile) {
JsonNode responseDataCopy = responseData.deepCopy();
ObjectNode responseDataCopyObj = (ObjectNode) responseDataCopy;
if (responseDataCopyObj.has("hits")) {
ObjectNode responseDataCopyObjHits = (ObjectNode) responseDataCopyObj.get("hits");
if (responseDataCopyObjHits.has("hits"))
responseDataCopyObjHits.remove("hits");
}
List<String> filtersLoggedList = new ArrayList<>();
if (!resolvers.isEmpty() && !resolversFilterTreeGrouped.isEmpty()) {
List<String> attributesResolversSummary = new ArrayList<>();
for (String resolverName : resolvers) {
List<String> resolversAttributes = new ArrayList<>();
for (String attributeName : input.model().resolvers().get(resolverName).attributes())
resolversAttributes.add("\"" + attributeName + "\"");
attributesResolversSummary.add("\"" + resolverName + "\":{\"attributes\":[" + String.join(",", resolversAttributes) + "]}");
String responseString;
if (responseData != null) {
JsonNode responseDataCopy = responseData.deepCopy();
ObjectNode responseDataCopyObj = (ObjectNode) responseDataCopy;
if (responseDataCopyObj.has("hits")) {
ObjectNode responseDataCopyObjHits = (ObjectNode) responseDataCopyObj.get("hits");
if (responseDataCopyObjHits.has("hits"))
responseDataCopyObjHits.remove("hits");
}
String attributesResolversFilterTreeLogged = Json.ORDERED_MAPPER.writeValueAsString(resolversFilterTreeGrouped);
filtersLoggedList.add("\"attributes\":{\"tree\":" + attributesResolversFilterTreeLogged + ",\"resolvers\":{" + String.join(",", attributesResolversSummary) + "}}");
responseString = responseDataCopyObj.toString();
} else {
filtersLoggedList.add("\"attributes\":null");
responseString = serializeElasticsearchException(responseError);
}
if (!termResolvers.isEmpty() && !termResolversFilterTree.isEmpty()) {
List<String> termsResolversSummary = new ArrayList<>();
for (String resolverName : termResolvers) {
List<String> resolverAttributes = new ArrayList<>();
for (String attributeName : input.model().resolvers().get(resolverName).attributes())
resolverAttributes.add("\"" + attributeName + "\"");
termsResolversSummary.add("\"" + resolverName + "\":{\"attributes\":[" + String.join(",", resolverAttributes) + "]}");
}
String termResolversFilterTreeLogged = Json.ORDERED_MAPPER.writeValueAsString(termResolversFilterTree);
filtersLoggedList.add("\"terms\":{\"tree\":{\"0\":" + termResolversFilterTreeLogged + "},\"resolvers\":{" + String.join(",", termsResolversSummary) + "}}");
} else {
filtersLoggedList.add("\"terms\":null");
}
String filtersLogged = String.join(",", filtersLoggedList);
String searchLogged = "{\"request\":" + query + ",\"response\":" + responseDataCopyObj + "}";
String logged = "{\"_hop\":" + this.hop + ",\"_query\":" + _query + ",\"_index\":\"" + indexName + "\",\"filters\":{" + filtersLogged + "},\"search\":" + searchLogged + "}";
String logged = serializeLoggedQuery(this.input, this.hop, _query, indexName, query, responseString, resolvers, resolversFilterTreeGrouped, termResolvers, termResolversFilterTree);
this.queries.add(logged);
}

// Stop traversing if there was an error not due to a missing index.
// Include the logged query in the response.
if (fatalError) {
this.error = serializeLoggedQuery(this.input, this.hop, _query, indexName, query, serializeElasticsearchException(responseError), resolvers, resolversFilterTreeGrouped, termResolvers, termResolversFilterTree);
return;
}

// Read the hits
if (responseData == null)
continue;
if (!responseData.has("hits"))
continue;
if (!responseData.get("hits").has("hits"))
@@ -1183,6 +1235,8 @@ public String run() throws IOException, ValidationException {
// Format response
List<String> responseParts = new ArrayList<>();
responseParts.add("\"took\":" + Long.toString(took));
if (this.error != null)
responseParts.add("\"error\":" + this.error);
if (this.includeHits)
responseParts.add("\"hits\":{\"total\":" + this.hits.size() + ",\"hits\":[" + String.join(",", this.hits) + "]}");
if (this.includeQueries || this.profile)

0 comments on commit 3e96b6d

Please sign in to comment.
You can’t perform that action at this time.