Skip to content

Commit

Permalink
Handle all exceptions thrown during a resolution job. Allow the Java …
Browse files Browse the repository at this point in the history
…stack trace and all hits and queries up to the point of failure to be included in the response when a resolution job fails. Set the HTTP response status code to 500 when a resolution job fails.
  • Loading branch information
davemoore- committed Sep 9, 2019
1 parent 431e2d8 commit f4e629c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 31 deletions.
93 changes: 63 additions & 30 deletions src/main/java/io/zentity/resolution/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
Expand All @@ -53,6 +55,7 @@ public class Job {

// Constants
public static final boolean DEFAULT_INCLUDE_ATTRIBUTES = true;
public static final boolean DEFAULT_INCLUDE_ERROR_TRACE = true;
public static final boolean DEFAULT_INCLUDE_EXPLANATION = false;
public static final boolean DEFAULT_INCLUDE_HITS = true;
public static final boolean DEFAULT_INCLUDE_QUERIES = false;
Expand All @@ -65,6 +68,7 @@ public class Job {
// Job configuration
private Input input;
private boolean includeAttributes = DEFAULT_INCLUDE_ATTRIBUTES;
private boolean includeErrorTrace = DEFAULT_INCLUDE_ERROR_TRACE;
private boolean includeExplanation = DEFAULT_INCLUDE_EXPLANATION;
private boolean includeHits = DEFAULT_INCLUDE_HITS;
private boolean includeQueries = DEFAULT_INCLUDE_QUERIES;
Expand All @@ -79,6 +83,7 @@ public class Job {
private NodeClient client;
private Map<String, Set<String>> docIds = new TreeMap<>();
private String error = null;
private boolean failed = false;
private List<String> hits = new ArrayList<>();
private int hop = 0;
private Set<String> missingIndices = new TreeSet<>();
Expand All @@ -89,16 +94,20 @@ public Job(NodeClient client) {
this.client = client;
}

public static String serializeException(Exception e) throws IOException {
String serialized;
if (e instanceof ElasticsearchException) {
ElasticsearchException ee = (ElasticsearchException) e;
String cause = Strings.toString(ee.toXContent(jsonBuilder().startObject(), ToXContent.EMPTY_PARAMS).endObject());
serialized = "{\"error\":{\"root_cause\":[" + cause + "],\"type\":\"" + ElasticsearchException.getExceptionName(ee) + "\",\"reason\":\"" + e.getMessage() + "\"},\"status\":" + ee.status().getStatus() + "}";
} else {
serialized = "{\"error\":{\"type\":\"" + e.getClass() + "\",\"reason\":\"" + e.getMessage() + "\"}}";
public static String serializeException(Exception e, boolean includeErrorTrace) {
List<String> errorParts = new ArrayList<>();
if (e instanceof ElasticsearchException)
errorParts.add("\"by\":\"elasticsearch\"");
else
errorParts.add("\"by\":\"zentity\"");
errorParts.add("\"type\":\"" + e.getClass().getCanonicalName() + "\"");
errorParts.add("\"reason\":\"" + e.getMessage() + "\"");
if (includeErrorTrace) {
StringWriter traceWriter = new StringWriter();
e.printStackTrace(new PrintWriter(traceWriter));
errorParts.add("\"stack_trace\":" + Json.quoteString(traceWriter.toString()) + "");
}
return serialized;
return String.join(",", errorParts);
}

public static String serializeLoggedQuery(Input input, int _hop, int _query, String indexName, String request, String response, List<String> resolvers, TreeMap<Integer, TreeMap<String, TreeMap>> resolversFilterTreeGrouped, List<String> termResolvers, TreeMap<String, TreeMap> termResolversFilterTree) throws JsonProcessingException {
Expand Down Expand Up @@ -522,6 +531,7 @@ private void resetState() {
this.attributes = new TreeMap<>(this.input().attributes());
this.docIds = new TreeMap<>();
this.error = null;
this.failed = false;
this.hits = new ArrayList<>();
this.hop = 0;
this.missingIndices = new TreeSet<>();
Expand All @@ -537,6 +547,14 @@ public void includeAttributes(boolean includeAttributes) {
this.includeAttributes = includeAttributes;
}

public boolean includeErrorTrace() {
return this.includeErrorTrace;
}

public void includeErrorTrace(boolean includeErrorTrace) {
this.includeErrorTrace = includeErrorTrace;
}

public boolean includeExplanation() {
return this.includeExplanation;
}
Expand Down Expand Up @@ -609,6 +627,14 @@ public void input(Input input) {
this.input = input;
}

public boolean failed() {
return this.failed;
}

public boolean ran() {
return this.ran;
}

/**
* Submit a search query to Elasticsearch.
*
Expand Down Expand Up @@ -997,7 +1023,6 @@ else if (!resolversClause.isEmpty())
// Submit query to Elasticsearch.
SearchResponse response = null;
Exception responseError = null;
boolean fatalError = false;
try {
response = this.search(indexName, query);
} catch (IndexNotFoundException e) {
Expand All @@ -1006,7 +1031,7 @@ else if (!resolversClause.isEmpty())
responseError = e;
} catch (Exception e) {
// Fail the job for any other error.
fatalError = true;
this.failed = true;
responseError = e;
}

Expand All @@ -1028,16 +1053,18 @@ else if (!resolversClause.isEmpty())
}
responseString = responseDataCopyObj.toString();
} else {
responseString = serializeException(responseError);
ElasticsearchException e = (ElasticsearchException) responseError;
String cause = Strings.toString(e.toXContent(jsonBuilder().startObject(), ToXContent.EMPTY_PARAMS).endObject());
responseString = "{\"error\":{\"root_cause\":[" + cause + "],\"type\":\"" + ElasticsearchException.getExceptionName(e) + "\",\"reason\":\"" + e.getMessage() + "\"},\"status\":" + e.status().getStatus() + "}";
}
String logged = serializeLoggedQuery(this.input, this.hop, _query, indexName, query, responseString, resolvers, resolversFilterTreeGrouped, termResolvers, termResolversFilterTree);
this.queries.add(logged);
}

// Stop traversing if there was an error not due to a missing index.
// Include the logged query in the response.
if (fatalError) {
this.error = serializeLoggedQuery(this.input, this.hop, _query, indexName, query, serializeException(responseError), resolvers, resolversFilterTreeGrouped, termResolvers, termResolversFilterTree);
if (this.failed) {
this.error = serializeException(responseError, this.includeErrorTrace);
return;
}

Expand Down Expand Up @@ -1228,7 +1255,7 @@ else if (input.model().matchers().containsKey(matcherName))
* @return A JSON string to be returned as the body of the response to a client.
* @throws IOException
*/
public String run() throws IOException, ValidationException {
public String run() throws IOException {
try {

// Reset the state of the job if reusing this Job object.
Expand All @@ -1238,22 +1265,28 @@ public String run() throws IOException, ValidationException {
this.attributes = new TreeMap<>(this.input.attributes());

// Start timer and begin job
String response;
long startTime = System.nanoTime();
this.traverse();
long took = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

// Format response
List<String> responseParts = new ArrayList<>();
responseParts.add("\"took\":" + Long.toString(took));
if (this.error != null)
responseParts.add("\"error\":" + this.error);
if (this.includeHits)
responseParts.add("\"hits\":{\"total\":" + this.hits.size() + ",\"hits\":[" + String.join(",", this.hits) + "]}");
if (this.includeQueries || this.profile)
responseParts.add("\"queries\":[" + queries + "]");
String response = "{" + String.join(",", responseParts) + "}";
if (this.pretty)
response = Json.ORDERED_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(Json.ORDERED_MAPPER.readTree(response));
try {
this.traverse();
} catch (Exception e) {
this.failed = true;
this.error = serializeException(e, this.includeErrorTrace);
} finally {
long took = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
// Format response
List<String> responseParts = new ArrayList<>();
responseParts.add("\"took\":" + Long.toString(took));
if (this.error != null)
responseParts.add("\"error\":{" + this.error + "}");
if (this.includeHits)
responseParts.add("\"hits\":{\"total\":" + this.hits.size() + ",\"hits\":[" + String.join(",", this.hits) + "]}");
if (this.includeQueries || this.profile)
responseParts.add("\"queries\":[" + queries + "]");
response = "{" + String.join(",", responseParts) + "}";
if (this.pretty)
response = Json.ORDERED_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(Json.ORDERED_MAPPER.readTree(response));
}
return response;

} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
// Parse the request params that will be passed to the job configuration
String entityType = restRequest.param("entity_type");
Boolean includeAttributes = restRequest.paramAsBoolean("_attributes", Job.DEFAULT_INCLUDE_ATTRIBUTES);
Boolean includeErrorTrace = restRequest.paramAsBoolean("error_trace", Job.DEFAULT_INCLUDE_ERROR_TRACE);
Boolean includeExplanation = restRequest.paramAsBoolean("_explanation", Job.DEFAULT_INCLUDE_EXPLANATION);
Boolean includeHits = restRequest.paramAsBoolean("hits", Job.DEFAULT_INCLUDE_HITS);
Boolean includeQueries = restRequest.paramAsBoolean("queries", Job.DEFAULT_INCLUDE_QUERIES);
Expand Down Expand Up @@ -70,6 +71,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
// Prepare the entity resolution job.
Job job = new Job(client);
job.includeAttributes(includeAttributes);
job.includeErrorTrace(includeErrorTrace);
job.includeExplanation(includeExplanation);
job.includeHits(includeHits);
job.includeQueries(includeQueries);
Expand All @@ -82,7 +84,10 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient

// Run the entity resolution job.
String response = job.run();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, "application/json", response));
if (job.failed())
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, "application/json", response));
else
channel.sendResponse(new BytesRestResponse(RestStatus.OK, "application/json", response));

} catch (ValidationException e) {
channel.sendResponse(new BytesRestResponse(channel, RestStatus.BAD_REQUEST, e));
Expand Down

0 comments on commit f4e629c

Please sign in to comment.