Navigation Menu

Skip to content

Commit

Permalink
Resolution input now supports an optional "ids" field to start the jo…
Browse files Browse the repository at this point in the history
…b by document _id(s) by index. In the first iteration of a job, documents are fetched by _id(s) by index and the attributes of those documents are acquired. Any _id supplied to "ids" is assumed to be representative of the entity. Jobs must have either or both of "ids" and "attributes" to run.
  • Loading branch information
davemoore- committed May 17, 2018
1 parent 9ea4b95 commit b2f48bc
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 42 deletions.
18 changes: 18 additions & 0 deletions src/main/java/io/zentity/common/Json.java
@@ -1,11 +1,29 @@
package io.zentity.common;

import com.fasterxml.jackson.core.io.JsonStringEncoder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

public class Json {

public static final ObjectMapper MAPPER = new ObjectMapper();
public static final ObjectMapper ORDERED_MAPPER = new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
private static final JsonStringEncoder STRING_ENCODER = new JsonStringEncoder();

public static String quoteString(String value) {
return jsonStringFormat(value);
}

private static String jsonStringEscape(String value) {
return new String(STRING_ENCODER.quoteAsString(value));
}

private static String jsonStringQuote(String value) {
return "\"" + value + "\"";
}

private static String jsonStringFormat(String value) {
return jsonStringQuote(jsonStringEscape(value));
}

}
57 changes: 32 additions & 25 deletions src/main/java/io/zentity/resolution/Job.java
@@ -1,6 +1,5 @@
package io.zentity.resolution;

import com.fasterxml.jackson.core.io.JsonStringEncoder;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.zentity.common.Json;
Expand Down Expand Up @@ -46,7 +45,6 @@ public class Job {
public static final int DEFAULT_MAX_HOPS = 100;
public static final boolean DEFAULT_PRETTY = false;
public static final boolean DEFAULT_PROFILE = false;
private final JsonStringEncoder encoder = new JsonStringEncoder();

// Job configuration
private Input input;
Expand Down Expand Up @@ -499,18 +497,6 @@ public void input(Input input) {
this.input = input;
}

private String jsonStringEscape(String value) {
return new String(encoder.quoteAsString(value));
}

private String jsonStringQuote(String value) {
return "\"" + value + "\"";
}

private String jsonStringFormat(String value) {
return jsonStringQuote(jsonStringEscape(value));
}

/**
* Submit a search query to Elasticsearch.
*
Expand Down Expand Up @@ -549,12 +535,14 @@ private void traverse() throws IOException, ValidationException {
if (!this.docIds.containsKey(indexName))
this.docIds.put(indexName, new TreeSet<>());

boolean filterIds = this.hop == 0 && this.input().ids().containsKey(indexName) && !this.input().ids().get(indexName).isEmpty();

// Determine which resolvers can be queried for this index.
List<String> resolvers = new ArrayList<>();
for (String resolverName : this.input.model().resolvers().keySet())
if (canQuery(this.input.model(), indexName, resolverName, this.attributes))
resolvers.add(resolverName);
if (resolvers.size() == 0)
if (resolvers.size() == 0 && !filterIds)
continue;

// Construct query for this index.
Expand All @@ -563,13 +551,14 @@ private void traverse() throws IOException, ValidationException {
List<String> queryClauses = new ArrayList<>();
List<String> queryMustNotClauses = new ArrayList<>();
List<String> queryFilterClauses = new ArrayList<>();
List<String> queryResolverClauses = new ArrayList<>();
List<String> topLevelClauses = new ArrayList<>();
topLevelClauses.add("\"_source\":true");

// Exclude docs by _id
Set<String> ids = this.docIds.get(indexName);
if (!ids.isEmpty())
queryMustNotClauses.add("{\"ids\":{\"values\":[" + String.join(",", ids) + "]}}");
Set<String> docIds = this.docIds.get(indexName);
if (!docIds.isEmpty())
queryMustNotClauses.add("{\"ids\":{\"values\":[" + String.join(",", docIds) + "]}}");

// Create "scope.exclude.attributes" clauses. Combine them into a single "should" clause.
if (!this.input.scope().exclude().attributes().isEmpty()) {
Expand All @@ -585,7 +574,7 @@ else if (size == 1)
if (!queryMustNotClauses.isEmpty())
queryClauses.add("\"must_not\":[" + String.join(",", queryMustNotClauses) + "]");

// Create "scope.include.attributes" clauses. Combine them into a single "filter" clause.
// Construct "scope.include.attributes" clauses. Combine them into a single "filter" clause.
if (!this.input.scope().include().attributes().isEmpty()) {
List<String> attributeClauses = makeAttributeClauses(this.input.model(), indexName, this.input.scope().include().attributes(), "filter");
int size = attributeClauses.size();
Expand All @@ -595,12 +584,30 @@ else if (size == 1)
queryFilterClauses.add(attributeClauses.get(0));
}

// Construct the "ids" clause if this is the first hop and if any ids are specified for this index.
String idsClause = "{}";
if (filterIds) {
Set<String> ids = this.input().ids().get(indexName);
idsClause = "{\"bool\":{\"filter\":[{\"ids\":{\"values\":[" + String.join(",", ids) + "]}}]}}";
}

// Construct the resolvers clause.
Map<String, Integer> counts = countAttributesAcrossResolvers(this.input.model(), resolvers);
List<List<String>> resolversSorted = sortResolverAttributes(this.input.model(), resolvers, counts);
TreeMap<String, TreeMap> resolversFilterTree = makeResolversFilterTree(resolversSorted);
String resolversClause = populateResolversFilterTree(this.input.model(), indexName, resolversFilterTree, this.attributes);
queryFilterClauses.add(resolversClause);
String resolversClause = "{}";
TreeMap<String, TreeMap> resolversFilterTree = new TreeMap<>();
if (!this.attributes.isEmpty()) {
Map<String, Integer> counts = countAttributesAcrossResolvers(this.input.model(), resolvers);
List<List<String>> resolversSorted = sortResolverAttributes(this.input.model(), resolvers, counts);
resolversFilterTree = makeResolversFilterTree(resolversSorted);
resolversClause = populateResolversFilterTree(this.input.model(), indexName, resolversFilterTree, this.attributes);
}

// Combine the ids clause and resolvers clause in a "should" clause if necessary.
if (!idsClause.equals("{}") && !resolversClause.equals("{}"))
queryFilterClauses.add("{\"bool\":{\"should\":[" + idsClause + "," + resolversClause + "]}}");
else if (!idsClause.equals("{}"))
queryFilterClauses.add(idsClause);
else
queryFilterClauses.add(resolversClause);

// Construct the top-level "filter" clause.
if (!queryFilterClauses.isEmpty()) {
Expand Down Expand Up @@ -662,7 +669,7 @@ else if (size == 1)
for (JsonNode doc : responseData.get("hits").get("hits")) {

// Skip doc if already fetched. Otherwise mark doc as fetched and then proceed.
String _id = jsonStringFormat(doc.get("_id").textValue());
String _id = Json.quoteString(doc.get("_id").textValue());
if (this.docIds.get(indexName).contains(_id))
continue;
this.docIds.get(indexName).add(_id);
Expand Down
61 changes: 56 additions & 5 deletions src/main/java/io/zentity/resolution/input/Input.java
Expand Up @@ -19,6 +19,7 @@
public class Input {

private Map<String, Attribute> attributes = new TreeMap<>();
private Map<String, Set<String>> ids = new TreeMap<>();
private String entityType;
private Model model;
private Scope scope = new Scope();
Expand Down Expand Up @@ -127,6 +128,47 @@ public static Model includeResolvers(Model model, Set<String> resolvers) throws
return model;
}

/**
* Parse and validate the "ids" field of the request body.
*
* @param requestBody The request body.
* @param model The entity model.
* @return The parsed "ids" field from the request body.
* @throws ValidationException
*/
public static Map<String, Set<String>> parseIds(JsonNode requestBody, Model model) throws ValidationException {
Map<String, Set<String>> idsObj = new TreeMap<>();
if (!requestBody.has("ids") || requestBody.get("ids").size() == 0)
return idsObj;
JsonNode ids = requestBody.get("ids");
Iterator<Map.Entry<String, JsonNode>> indices = ids.fields();
while (indices.hasNext()) {
Map.Entry<String, JsonNode> index = indices.next();
String indexName = index.getKey();
JsonNode idsValues = index.getValue();

// Validate that the index exists in the entity model.
if (!model.indices().containsKey(indexName))
throw new ValidationException("'ids." + indexName + "' is not defined in the entity model.");

// Parse the id values.
idsObj.put(indexName, new TreeSet<>());
if (!idsValues.isNull() && !idsValues.isArray())
throw new ValidationException("'ids." + indexName + "' must be an array.");
Iterator<JsonNode> idsNode = idsValues.elements();
while (idsNode.hasNext()) {
JsonNode idNode = idsNode.next();
if (!idNode.isTextual())
throw new ValidationException("'ids." + indexName + "' must be an array of strings.");
String id = idNode.asText();
if (Patterns.EMPTY_STRING.matcher(id).matches())
throw new ValidationException("'ids." + indexName + "' must be an array of non-empty strings.");
idsObj.get(indexName).add(Json.quoteString(id));
}
}
return idsObj;
}

/**
* Parse and validate the "attributes" field of the request body.
*
Expand All @@ -137,12 +179,10 @@ public static Model includeResolvers(Model model, Set<String> resolvers) throws
* @throws JsonProcessingException
*/
public static Map<String, Attribute> parseAttributes(JsonNode requestBody, Model model) throws ValidationException, JsonProcessingException {
if (!requestBody.has("attributes"))
throw new ValidationException("The 'attributes' field is missing from the request body.");
if (requestBody.get("attributes").size() == 0)
throw new ValidationException("The 'attributes' field of the request body must not be empty.");
JsonNode attributes = requestBody.get("attributes");
Map<String, Attribute> attributesObj = new TreeMap<>();
if (!requestBody.has("attributes") || requestBody.get("attributes").size() == 0)
return attributesObj;
JsonNode attributes = requestBody.get("attributes");
Iterator<String> attributeFields = attributes.fieldNames();
while (attributeFields.hasNext()) {
String attributeName = attributeFields.next();
Expand Down Expand Up @@ -219,6 +259,10 @@ public Map<String, Attribute> attributes() {
return this.attributes;
}

public Map<String, Set<String>> ids() {
return this.ids;
}

public Model model() {
return this.model;
}
Expand All @@ -243,6 +287,13 @@ public void deserialize(JsonNode json) throws ValidationException, IOException {
// Parse and validate the "attributes" field of the request body.
this.attributes = parseAttributes(json, this.model);

// Parse and validate the "ids" field of the request body.
this.ids = parseIds(json, this.model);

// Ensure that either the "attributes" or "ids" field exists and is valid.
if (this.attributes().isEmpty() && this.ids.isEmpty())
throw new ValidationException("The 'attributes' and 'ids' fields are missing from the request body. At least one must exist.");

// Parse and validate the "scope" field of the request body.
if (json.has("scope")) {
this.scope.deserialize(json.get("scope"), this.model);
Expand Down
Expand Up @@ -65,10 +65,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
String model = getResponse.getSourceAsString();
input = new Input(body, new Model(model));
}
if (input.attributes().isEmpty())
throw new ValidationException("'attributes' is missing.");
if (input.model() == null)
throw new ValidationException("'model' is missing.");
throw new ValidationException("An entity model was not given for this request.");

// Prepare the entity resolution job.
Job job = new Job(client);
Expand Down
101 changes: 98 additions & 3 deletions src/test/java/io/zentity/resolution/JobIT.java
Expand Up @@ -19,7 +19,7 @@ public class JobIT extends AbstractITCase {

private final Map<String, String> params = Collections.emptyMap();

private final StringEntity TEST_PAYLOAD_JOB = new StringEntity("{\n" +
private final StringEntity TEST_PAYLOAD_JOB_ATTRIBUTES = new StringEntity("{\n" +
" \"attributes\": {\n" +
" \"attribute_a\": [ \"a_00\" ]\n" +
" },\n" +
Expand All @@ -31,6 +31,33 @@ public class JobIT extends AbstractITCase {
" }\n" +
"}", ContentType.APPLICATION_JSON);

private final StringEntity TEST_PAYLOAD_JOB_IDS = new StringEntity("{\n" +
" \"ids\": {\n" +
" \".zentity_test_index_a\": [ \"a0\" ]\n" +
" },\n" +
" \"scope\": {\n" +
" \"include\": {\n" +
" \"indices\": [ \".zentity_test_index_a\", \".zentity_test_index_b\", \".zentity_test_index_c\" ],\n" +
" \"resolvers\": [ \"resolver_a\", \"resolver_b\" ]\n" +
" }\n" +
" }\n" +
"}", ContentType.APPLICATION_JSON);

private final StringEntity TEST_PAYLOAD_JOB_ATTRIBUTES_IDS = new StringEntity("{\n" +
" \"attributes\": {\n" +
" \"attribute_a\": [ \"a_00\" ]\n" +
" },\n" +
" \"ids\": {\n" +
" \".zentity_test_index_a\": [ \"a6\" ]\n" +
" },\n" +
" \"scope\": {\n" +
" \"include\": {\n" +
" \"indices\": [ \".zentity_test_index_a\", \".zentity_test_index_b\", \".zentity_test_index_c\" ],\n" +
" \"resolvers\": [ \"resolver_a\", \"resolver_b\" ]\n" +
" }\n" +
" }\n" +
"}", ContentType.APPLICATION_JSON);

private final StringEntity TEST_PAYLOAD_JOB_MAX_HOPS_AND_DOCS = new StringEntity("{\n" +
" \"attributes\": {\n" +
" \"attribute_d\": { \"values\": [ \"d_00\" ] }\n" +
Expand Down Expand Up @@ -265,11 +292,11 @@ private Set<String> getActual(JsonNode json) {
return docsActual;
}

public void testJob() throws Exception {
public void testJobAttributes() throws Exception {
try {
prepareTestResources();
String endpoint = "_zentity/resolution/zentity_test_entity_a";
Response response = client.performRequest("POST", endpoint, params, TEST_PAYLOAD_JOB);
Response response = client.performRequest("POST", endpoint, params, TEST_PAYLOAD_JOB_ATTRIBUTES);
JsonNode json = Json.MAPPER.readTree(response.getEntity().getContent());
assertEquals(json.get("hits").get("total").asInt(), 6);

Expand All @@ -287,6 +314,74 @@ public void testJob() throws Exception {
}
}

public void testJobIds() throws Exception {
try {
prepareTestResources();
String endpoint = "_zentity/resolution/zentity_test_entity_a";
Response response = client.performRequest("POST", endpoint, params, TEST_PAYLOAD_JOB_IDS);
JsonNode json = Json.MAPPER.readTree(response.getEntity().getContent());
assertEquals(json.get("hits").get("total").asInt(), 6);

Set<String> docsExpected = new TreeSet<>();
docsExpected.add("a0,0");
docsExpected.add("b0,1");
docsExpected.add("c0,2");
docsExpected.add("a1,3");
docsExpected.add("b1,4");
docsExpected.add("c1,5");

assertEquals(docsExpected, getActual(json));
} finally {
destroyTestResources();
}
}

public void testJobAttributesIds() throws Exception {
try {
prepareTestResources();
String endpoint = "_zentity/resolution/zentity_test_entity_a";
Response response = client.performRequest("POST", endpoint, params, TEST_PAYLOAD_JOB_ATTRIBUTES_IDS);
JsonNode json = Json.MAPPER.readTree(response.getEntity().getContent());
assertEquals(json.get("hits").get("total").asInt(), 30);

Set<String> docsExpected = new TreeSet<>();
docsExpected.add("a0,0");
docsExpected.add("a6,0");
docsExpected.add("b0,0");
docsExpected.add("a2,1");
docsExpected.add("a7,1");
docsExpected.add("a8,1");
docsExpected.add("a9,1");
docsExpected.add("b2,1");
docsExpected.add("b6,1");
docsExpected.add("b7,1");
docsExpected.add("b8,1");
docsExpected.add("b9,1");
docsExpected.add("c0,1");
docsExpected.add("c2,1");
docsExpected.add("c6,1");
docsExpected.add("c7,1");
docsExpected.add("c8,1");
docsExpected.add("c9,1");
docsExpected.add("a1,2");
docsExpected.add("a3,2");
docsExpected.add("a4,2");
docsExpected.add("a5,2");
docsExpected.add("b3,2");
docsExpected.add("b4,2");
docsExpected.add("b5,2");
docsExpected.add("c3,2");
docsExpected.add("c4,2");
docsExpected.add("c5,2");
docsExpected.add("b1,3");
docsExpected.add("c1,4");

assertEquals(docsExpected, getActual(json));
} finally {
destroyTestResources();
}
}

public void testJobMaxHopsAndDocs() throws Exception {
try {
prepareTestResources();
Expand Down

0 comments on commit b2f48bc

Please sign in to comment.