Skip to content

Commit

Permalink
Added new URL parameters to the Resolution API: max_time_per_query, _…
Browse files Browse the repository at this point in the history
…seq_no_primary_term, _version, and several parameters for advanced search optimizations.
  • Loading branch information
davemoore- committed Mar 7, 2020
1 parent 3d325e2 commit 14bbe8a
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<zentity.website>https://zentity.io</zentity.website>
<zentity.version>1.5.1</zentity.version>
<!-- dependency versions -->
<elasticsearch.version>7.6.0</elasticsearch.version>
<elasticsearch.version>7.6.1</elasticsearch.version>
<jackson.core.version>2.9.10</jackson.core.version>
<jackson.databind.version>2.9.10.3</jackson.databind.version>
<jdk.version>1.11</jdk.version>
Expand Down
105 changes: 98 additions & 7 deletions src/main/java/io/zentity/resolution/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
Expand Down Expand Up @@ -61,25 +62,47 @@ public class Job {
public static final boolean DEFAULT_INCLUDE_EXPLANATION = false;
public static final boolean DEFAULT_INCLUDE_HITS = true;
public static final boolean DEFAULT_INCLUDE_QUERIES = false;
public static final boolean DEFAULT_INCLUDE_SEQ_NO_PRIMARY_TERM = false;
public static final boolean DEFAULT_INCLUDE_SOURCE = true;
public static final boolean DEFAULT_INCLUDE_VERSION = false;
public static final int DEFAULT_MAX_DOCS_PER_QUERY = 1000;
public static final int DEFAULT_MAX_HOPS = 100;
public static final String DEFAULT_MAX_TIME_PER_QUERY = "10s";
public static final boolean DEFAULT_PRETTY = false;
public static final boolean DEFAULT_PROFILE = false;

// Constants (optional search parameters)
public static Boolean DEFAULT_SEARCH_ALLOW_PARTIAL_SEARCH_RESULTS = null;
public static Integer DEFAULT_SEARCH_BATCHED_REDUCE_SIZE = null;
public static Integer DEFAULT_SEARCH_MAX_CONCURRENT_SHARD_REQUESTS = null;
public static Integer DEFAULT_SEARCH_PRE_FILTER_SHARD_SIZE = null;
public static String DEFAULT_SEARCH_PREFERENCE = null;
public static Boolean DEFAULT_SEARCH_REQUEST_CACHE = null;

// Job configuration
private Input input;
private boolean includeAttributes = DEFAULT_INCLUDE_ATTRIBUTES;
private boolean includeErrorTrace = DEFAULT_INCLUDE_ERROR_TRACE;
private boolean includeExplanation = DEFAULT_INCLUDE_EXPLANATION;
private boolean includeHits = DEFAULT_INCLUDE_HITS;
private boolean includeQueries = DEFAULT_INCLUDE_QUERIES;
private boolean includeSeqNoPrimaryTerm = DEFAULT_INCLUDE_SEQ_NO_PRIMARY_TERM;
private boolean includeSource = DEFAULT_INCLUDE_SOURCE;
private boolean includeVersion = DEFAULT_INCLUDE_VERSION;
private int maxDocsPerQuery = DEFAULT_MAX_DOCS_PER_QUERY;
private int maxHops = DEFAULT_MAX_HOPS;
private String maxTimePerQuery = DEFAULT_MAX_TIME_PER_QUERY;
private boolean pretty = DEFAULT_PRETTY;
private boolean profile = DEFAULT_PROFILE;

// Job configuration (optional search parameters)
private Boolean searchAllowPartialSearchResults = DEFAULT_SEARCH_ALLOW_PARTIAL_SEARCH_RESULTS;
private Integer searchBatchedReduceSize = DEFAULT_SEARCH_BATCHED_REDUCE_SIZE;
private Integer searchMaxConcurrentShardRequests = DEFAULT_SEARCH_MAX_CONCURRENT_SHARD_REQUESTS;
private Integer searchPreFilterShardSize = DEFAULT_SEARCH_PRE_FILTER_SHARD_SIZE;
private String searchPreference = DEFAULT_SEARCH_PREFERENCE;
private Boolean searchRequestCache = DEFAULT_SEARCH_REQUEST_CACHE;

// Job state
private Map<String, Attribute> attributes = new TreeMap<>();
private NodeClient client;
Expand Down Expand Up @@ -581,6 +604,12 @@ public void includeQueries(boolean includeQueries) {
this.includeQueries = includeQueries;
}

public Boolean includeSeqNoPrimaryTerm() {
return this.includeSeqNoPrimaryTerm;
}

public void includeSeqNoPrimaryTerm(Boolean includeSeqNoPrimaryTerm) { this.includeSeqNoPrimaryTerm = includeSeqNoPrimaryTerm; }

public boolean includeSource() {
return this.includeSource;
}
Expand All @@ -589,13 +618,9 @@ public void includeSource(boolean includeSource) {
this.includeSource = includeSource;
}

public int maxHops() {
return this.maxHops;
}
public Boolean includeVersion() { return this.includeVersion; }

public void maxHops(int maxHops) {
this.maxHops = maxHops;
}
public void includeVersion(Boolean includeVersion) { this.includeVersion = includeVersion; }

public int maxDocsPerQuery() {
return this.maxDocsPerQuery;
Expand All @@ -605,6 +630,18 @@ public void maxDocsPerQuery(int maxDocsPerQuery) {
this.maxDocsPerQuery = maxDocsPerQuery;
}

public int maxHops() {
return this.maxHops;
}

public void maxHops(int maxHops) {
this.maxHops = maxHops;
}

public String maxTimePerQuery() { return this.maxTimePerQuery; }

public void maxTimePerQuery(String maxTimePerQuery) { this.maxTimePerQuery = maxTimePerQuery; }

public boolean pretty() {
return this.pretty;
}
Expand All @@ -621,6 +658,41 @@ public void profile(Boolean profile) {
this.profile = profile;
}

public Boolean searchAllowPartialSearchResults() {
return this.searchAllowPartialSearchResults;
}

public void searchAllowPartialSearchResults(Boolean searchAllowPartialSearchResults) { this.searchAllowPartialSearchResults = searchAllowPartialSearchResults; }

public Integer searchBatchedReduceSize() {
return this.searchBatchedReduceSize;
}

public void searchBatchedReduceSize(Integer searchBatchedReduceSize) { this.searchBatchedReduceSize = searchBatchedReduceSize; }

public Integer searchMaxConcurrentShardRequests() {
return this.searchMaxConcurrentShardRequests;
}

public void searchMaxConcurrentShardRequests(Integer searchMaxConcurrentShardRequests) { this.searchMaxConcurrentShardRequests = searchMaxConcurrentShardRequests; }

public Integer searchPreFilterShardSize() {
return this.searchPreFilterShardSize;
}

public void searchPreFilterShardSize(Integer searchPreFilterShardSize) { this.searchPreFilterShardSize = searchPreFilterShardSize; }

public String searchPreference() {
return this.searchPreference;
}

public void searchPreference(String searchPreference) { this.searchPreference = searchPreference; }
public Boolean searchRequestCache() {
return this.searchRequestCache;
}

public void searchRequestCache(Boolean searchRequestCache) { this.searchRequestCache = searchRequestCache; }

public Input input() {
return this.input;
}
Expand Down Expand Up @@ -653,7 +725,22 @@ private SearchResponse search(String indexName, String query) throws IOException
searchSourceBuilder.parseXContent(parser);
}
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
return searchRequestBuilder.setIndices(indexName).setSource(searchSourceBuilder).execute().actionGet();
searchRequestBuilder.setIndices(indexName).setSource(searchSourceBuilder);
if (this.searchAllowPartialSearchResults != null)
searchRequestBuilder.setAllowPartialSearchResults(this.searchAllowPartialSearchResults);
if (this.searchBatchedReduceSize != null)
searchRequestBuilder.setBatchedReduceSize(this.searchBatchedReduceSize);
if (this.searchMaxConcurrentShardRequests != null)
searchRequestBuilder.setMaxConcurrentShardRequests(this.searchMaxConcurrentShardRequests);
if (this.searchPreFilterShardSize != null)
searchRequestBuilder.setPreFilterShardSize(this.searchPreFilterShardSize);
if (this.searchPreference != null)
searchRequestBuilder.setPreference(this.searchPreference);
if (this.searchRequestCache != null)
searchRequestBuilder.setRequestCache(this.searchRequestCache);
if (this.maxTimePerQuery != null)
searchRequestBuilder.setTimeout(TimeValue.parseTimeValue(this.maxTimePerQuery, "timeout"));
return searchRequestBuilder.execute().actionGet();
}

/**
Expand Down Expand Up @@ -1018,6 +1105,10 @@ else if (!resolversClause.isEmpty())
// Construct the "profile" clause.
if (this.profile)
topLevelClauses.add("\"profile\":true");
if (this.includeSeqNoPrimaryTerm)
topLevelClauses.add("\"seq_no_primary_term\":true");
if (this.includeVersion)
topLevelClauses.add("\"version\":true");

// Construct the final query.
query = "{" + String.join(",", topLevelClauses) + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,41 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
Boolean includeExplanation = restRequest.paramAsBoolean("_explanation", Job.DEFAULT_INCLUDE_EXPLANATION);
Boolean includeHits = restRequest.paramAsBoolean("hits", Job.DEFAULT_INCLUDE_HITS);
Boolean includeQueries = restRequest.paramAsBoolean("queries", Job.DEFAULT_INCLUDE_QUERIES);
Boolean includeSeqNoPrimaryTerm = restRequest.paramAsBoolean("_seq_no_primary_term", Job.DEFAULT_INCLUDE_SEQ_NO_PRIMARY_TERM);
Boolean includeSource = restRequest.paramAsBoolean("_source", Job.DEFAULT_INCLUDE_SOURCE);
Boolean includeVersion = restRequest.paramAsBoolean("_version", Job.DEFAULT_INCLUDE_VERSION);
int maxDocsPerQuery = restRequest.paramAsInt("max_docs_per_query", Job.DEFAULT_MAX_DOCS_PER_QUERY);
int maxHops = restRequest.paramAsInt("max_hops", Job.DEFAULT_MAX_HOPS);
String maxTimePerQuery = restRequest.param("max_time_per_query", Job.DEFAULT_MAX_TIME_PER_QUERY);
Boolean pretty = restRequest.paramAsBoolean("pretty", Job.DEFAULT_PRETTY);
Boolean profile = restRequest.paramAsBoolean("profile", Job.DEFAULT_PROFILE);

// Parse any optional search parameters that will be passed to the job configuration.
// Note: org.elasticsearch.rest.RestRequest doesn't allow null values as default values for integer parameters,
// which is why the code below handles the integer parameters differently from the others.
Boolean searchAllowPartialSearchResults = restRequest.paramAsBoolean("search.allow_partial_search_results", Job.DEFAULT_SEARCH_ALLOW_PARTIAL_SEARCH_RESULTS);
Integer searchBatchedReduceSize = Job.DEFAULT_SEARCH_BATCHED_REDUCE_SIZE;
if (restRequest.hasParam("search.batched_reduce_size"))
searchBatchedReduceSize = Integer.parseInt(restRequest.param("search.batched_reduce_size"));
Integer searchMaxConcurrentShardRequests = Job.DEFAULT_SEARCH_MAX_CONCURRENT_SHARD_REQUESTS;
if (restRequest.hasParam("search.max_concurrent_shard_requests"))
searchMaxConcurrentShardRequests = Integer.parseInt(restRequest.param("search.max_concurrent_shard_requests"));
Integer searchPreFilterShardSize = Job.DEFAULT_SEARCH_PRE_FILTER_SHARD_SIZE;
if (restRequest.hasParam("search.pre_filter_shard_size"))
searchPreFilterShardSize = Integer.parseInt(restRequest.param("search.pre_filter_shard_size"));
String searchPreference = restRequest.param("search.preference", Job.DEFAULT_SEARCH_PREFERENCE);
Boolean searchRequestCache = restRequest.paramAsBoolean("search.request_cache", Job.DEFAULT_SEARCH_REQUEST_CACHE);
Integer finalSearchBatchedReduceSize = searchBatchedReduceSize;
Integer finalSearchMaxConcurrentShardRequests = searchMaxConcurrentShardRequests;
Integer finalSearchPreFilterShardSize = searchPreFilterShardSize;

return channel -> {
try {

// Validate the request body.
if (body == null || body.equals(""))
throw new ValidationException("Request body is missing.");


// Parse and validate the job input.
Input input;
if (entityType == null || entityType.equals("")) {
Expand All @@ -73,13 +94,24 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
job.includeExplanation(includeExplanation);
job.includeHits(includeHits);
job.includeQueries(includeQueries);
job.includeSeqNoPrimaryTerm(includeSeqNoPrimaryTerm);
job.includeSource(includeSource);
job.includeVersion(includeVersion);
job.maxDocsPerQuery(maxDocsPerQuery);
job.maxHops(maxHops);
job.maxTimePerQuery(maxTimePerQuery);
job.pretty(pretty);
job.profile(profile);
job.input(input);

// Optional search parameters
job.searchAllowPartialSearchResults(searchAllowPartialSearchResults);
job.searchBatchedReduceSize(finalSearchBatchedReduceSize);
job.searchMaxConcurrentShardRequests(finalSearchMaxConcurrentShardRequests);
job.searchPreFilterShardSize(finalSearchPreFilterShardSize);
job.searchPreference(searchPreference);
job.searchRequestCache(searchRequestCache);

// Run the entity resolution job.
String response = job.run();
if (job.failed())
Expand Down
58 changes: 58 additions & 0 deletions src/test/java/io/zentity/resolution/JobIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -1716,4 +1716,62 @@ public void testJobArrays() throws Exception {
destroyTestResources(testResourceSet);
}
}

public void testJobSearchParams() throws Exception {
int testResourceSet = TEST_RESOURCES_A;
prepareTestResources(testResourceSet);
try {
String endpoint = "_zentity/resolution/zentity_test_entity_a";
Request postResolution = new Request("POST", endpoint);
postResolution.setEntity(TEST_PAYLOAD_JOB_ATTRIBUTES);
postResolution.addParameter("_seq_no_primary_term", "true");
postResolution.addParameter("_version", "true");
postResolution.addParameter("max_time_per_query", "5s");
postResolution.addParameter("search.allow_partial_search_results", "true");
postResolution.addParameter("search.batched_reduce_size", "5");
postResolution.addParameter("search.max_concurrent_shard_requests", "5");
postResolution.addParameter("search.pre_filter_shard_size", "5");
postResolution.addParameter("search.request_cache", "true");
Response response = client.performRequest(postResolution);
JsonNode json = Json.MAPPER.readTree(response.getEntity().getContent());
assertEquals(json.get("hits").get("total").asInt(), 6);
Set<String> docsExpected = new TreeSet<>();
docsExpected.add("a0,0");
docsExpected.add("b0,0");
docsExpected.add("c0,1");
docsExpected.add("a1,2");
docsExpected.add("b1,3");
docsExpected.add("c1,4");
assertEquals(docsExpected, getActual(json));
for (JsonNode doc : json.get("hits").get("hits")) {
assertTrue(doc.has("_primary_term"));
assertTrue(doc.has("_seq_no"));
assertTrue(doc.has("_version"));
}

String endpoint2 = "_zentity/resolution/zentity_test_entity_a";
Request postResolution2 = new Request("POST", endpoint2);
postResolution2.setEntity(TEST_PAYLOAD_JOB_ATTRIBUTES);
postResolution2.addParameter("_seq_no_primary_term", "false");
postResolution2.addParameter("_version", "false");
Response response2 = client.performRequest(postResolution2);
JsonNode json2 = Json.MAPPER.readTree(response2.getEntity().getContent());
assertEquals(json2.get("hits").get("total").asInt(), 6);
Set<String> docsExpected2 = new TreeSet<>();
docsExpected2.add("a0,0");
docsExpected2.add("b0,0");
docsExpected2.add("c0,1");
docsExpected2.add("a1,2");
docsExpected2.add("b1,3");
docsExpected2.add("c1,4");
assertEquals(docsExpected2, getActual(json2));
for (JsonNode doc : json2.get("hits").get("hits")) {
assertFalse(doc.has("_primary_term"));
assertFalse(doc.has("_seq_no"));
assertFalse(doc.has("_version"));
}
} finally {
destroyTestResources(testResourceSet);
}
}
}

0 comments on commit 14bbe8a

Please sign in to comment.