Skip to content

Commit

Permalink
Validate enrich index before completing policy execution (elastic#100106
Browse files Browse the repository at this point in the history
)

This PR adds a validation step to the end of an enrich policy run to ensure the integrity of the 
enrich index that is about to be promoted.
  • Loading branch information
jbaiera committed Oct 2, 2023
1 parent 1369ff2 commit 225db31
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 8 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/100106.yaml
@@ -0,0 +1,5 @@
pr: 100106
summary: Validate enrich index before completing policy execution
area: Ingest Node
type: bug
issues: []
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -132,13 +134,9 @@ public void run() {
logger.debug("Policy [{}]: Checking source indices [{}]", policyName, sourceIndices);
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndices);
// This call does not set the origin to ensure that the user executing the policy has permission to access the source index
client.admin().indices().getIndex(getIndexRequest, listener.delegateFailure((l, getIndexResponse) -> {
try {
validateMappings(getIndexResponse);
prepareAndCreateEnrichIndex(toMappings(getIndexResponse));
} catch (Exception e) {
l.onFailure(e);
}
client.admin().indices().getIndex(getIndexRequest, listener.delegateFailureAndWrap((l, getIndexResponse) -> {
validateMappings(getIndexResponse);
prepareAndCreateEnrichIndex(toMappings(getIndexResponse));
}));
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -624,14 +622,66 @@ private void waitForIndexGreen(final String destinationIndexName) {
ClusterHealthRequest request = new ClusterHealthRequest(destinationIndexName).waitForGreenStatus();
enrichOriginClient().admin()
.cluster()
.health(request, listener.delegateFailure((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
.health(request, listener.delegateFailureAndWrap((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
}

/**
* Ensures that the index we are about to promote at the end of a policy execution exists, is intact, and has not been damaged
* during the policy execution. In some cases, it is possible for the index being constructed to be deleted during the policy execution
* and recreated with invalid mappings/data. We validate that the mapping exists and that it contains the expected meta fields on it to
* guard against accidental removal and recreation during policy execution.
*/
private void validateIndexBeforePromotion(String destinationIndexName, ClusterState clusterState) {
IndexMetadata destinationIndex = clusterState.metadata().index(destinationIndexName);
if (destinationIndex == null) {
throw new IndexNotFoundException(
"was not able to promote it as part of executing enrich policy [" + policyName + "]",
destinationIndexName
);
}
MappingMetadata mapping = destinationIndex.mapping();
if (mapping == null) {
throw new ResourceNotFoundException(
"Could not locate mapping for enrich index [{}] while completing [{}] policy run",
destinationIndexName,
policyName
);
}
Map<String, Object> mappingSource = mapping.sourceAsMap();
Object meta = mappingSource.get("_meta");
if (meta instanceof Map<?, ?> metaMap) {
Object policyNameMetaField = metaMap.get(ENRICH_POLICY_NAME_FIELD_NAME);
if (policyNameMetaField == null) {
throw new ElasticsearchException(
"Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field missing",
destinationIndexName,
policyName
);
} else if (policyName.equals(policyNameMetaField) == false) {
throw new ElasticsearchException(
"Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field does not "
+ "match expected value of [{}], was [{}]",
destinationIndexName,
policyName,
policyName,
policyNameMetaField.toString()
);
}
} else {
throw new ElasticsearchException(
"Could not verify enrich index [{}] metadata before completing [{}] policy run: mapping meta field missing",
destinationIndexName,
policyName
);
}
}

private void updateEnrichPolicyAlias(final String destinationIndexName) {
String enrichIndexBase = EnrichPolicy.getBaseName(policyName);
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
ClusterState clusterState = clusterService.state();
validateIndexBeforePromotion(destinationIndexName, clusterState);
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(clusterState, aliasRequest);
String[] aliases = aliasRequest.aliases();
IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest();
Expand Down
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
Expand Down Expand Up @@ -2253,6 +2255,79 @@ public void testEnrichNestedField() throws Exception {
""");
}

public void testRunnerValidatesIndexIntegrity() throws Exception {
final String sourceIndex = "source-index";
IndexResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source("""
{
"field1": "value1",
"field2": 2,
"field3": "ignored",
"field4": "ignored",
"field5": "value5"
}""", XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).actionGet();
assertEquals(RestStatus.CREATED, indexRequest.status());

SearchResponse sourceSearchResponse = client().search(
new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
).actionGet();
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
assertNotNull(sourceDocMap);
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
assertThat(sourceDocMap.get("field4"), is(equalTo("ignored")));
assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));

List<String> enrichFields = List.of("field2", "field5");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
String policyName = "test1";

final long createTime = randomNonNegativeLong();
String createdEnrichIndex = ".enrich-test1-" + createTime;
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);

// Wrap the client so that when we receive the reindex action, we delete the index then resume operation. This mimics an invalid
// state for the resulting index.
Client client = new FilterClient(client()) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
if (action.equals(EnrichReindexAction.INSTANCE)) {
super.doExecute(
DeleteIndexAction.INSTANCE,
new DeleteIndexRequest(createdEnrichIndex),
listener.delegateFailureAndWrap((delegate, response) -> {
if (response.isAcknowledged() == false) {
fail("Enrich index should have been deleted but was not");
}
super.doExecute(action, request, delegate);
})
);
} else {
super.doExecute(action, request, listener);
}
}
};
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(client, policyName, policy, listener, createdEnrichIndex);

logger.info("Starting policy run");
enrichPolicyRunner.run();
latch.await();
Exception runnerException = exception.get();
if (runnerException == null) {
fail("Expected the runner to fail when the underlying index was deleted during policy execution!");
}
assertThat(runnerException, is(instanceOf(ElasticsearchException.class)));
assertThat(runnerException.getMessage(), containsString("Could not verify enrich index"));
assertThat(runnerException.getMessage(), containsString("mapping meta field missing"));
}

private EnrichPolicyRunner createPolicyRunner(
String policyName,
EnrichPolicy policy,
Expand Down

0 comments on commit 225db31

Please sign in to comment.