From 225db3190a90643fccacf771442e279634397ef0 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 2 Oct 2023 16:14:19 -0400 Subject: [PATCH] Validate enrich index before completing policy execution (#100106) This PR adds a validation step to the end of an enrich policy run to ensure the integrity of the enrich index that is about to be promoted. --- docs/changelog/100106.yaml | 5 ++ .../xpack/enrich/EnrichPolicyRunner.java | 66 ++++++++++++++-- .../xpack/enrich/EnrichPolicyRunnerTests.java | 75 +++++++++++++++++++ 3 files changed, 138 insertions(+), 8 deletions(-) create mode 100644 docs/changelog/100106.yaml diff --git a/docs/changelog/100106.yaml b/docs/changelog/100106.yaml new file mode 100644 index 0000000000000..c3e3d50d2597a --- /dev/null +++ b/docs/changelog/100106.yaml @@ -0,0 +1,5 @@ +pr: 100106 +summary: Validate enrich index before completing policy execution +area: Ingest Node +type: bug +issues: [] diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index a3c3e65171d83..3f77dc9d722ab 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -43,6 +44,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperService; @@ -132,13 +134,9 @@ public void run() { logger.debug("Policy [{}]: Checking source indices [{}]", policyName, sourceIndices); GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndices); // This call does not set the origin to ensure that the user executing the policy has permission to access the source index - client.admin().indices().getIndex(getIndexRequest, listener.delegateFailure((l, getIndexResponse) -> { - try { - validateMappings(getIndexResponse); - prepareAndCreateEnrichIndex(toMappings(getIndexResponse)); - } catch (Exception e) { - l.onFailure(e); - } + client.admin().indices().getIndex(getIndexRequest, listener.delegateFailureAndWrap((l, getIndexResponse) -> { + validateMappings(getIndexResponse); + prepareAndCreateEnrichIndex(toMappings(getIndexResponse)); })); } catch (Exception e) { listener.onFailure(e); @@ -624,7 +622,58 @@ private void waitForIndexGreen(final String destinationIndexName) { ClusterHealthRequest request = new ClusterHealthRequest(destinationIndexName).waitForGreenStatus(); enrichOriginClient().admin() .cluster() - .health(request, listener.delegateFailure((l, r) -> updateEnrichPolicyAlias(destinationIndexName))); + .health(request, listener.delegateFailureAndWrap((l, r) -> updateEnrichPolicyAlias(destinationIndexName))); + } + + /** + * Ensures that the index we are about to promote at the end of a policy execution exists, is intact, and has not been damaged + * during the policy execution. In some cases, it is possible for the index being constructed to be deleted during the policy execution + * and recreated with invalid mappings/data. We validate that the mapping exists and that it contains the expected meta fields on it to + * guard against accidental removal and recreation during policy execution. + */ + private void validateIndexBeforePromotion(String destinationIndexName, ClusterState clusterState) { + IndexMetadata destinationIndex = clusterState.metadata().index(destinationIndexName); + if (destinationIndex == null) { + throw new IndexNotFoundException( + "was not able to promote it as part of executing enrich policy [" + policyName + "]", + destinationIndexName + ); + } + MappingMetadata mapping = destinationIndex.mapping(); + if (mapping == null) { + throw new ResourceNotFoundException( + "Could not locate mapping for enrich index [{}] while completing [{}] policy run", + destinationIndexName, + policyName + ); + } + Map mappingSource = mapping.sourceAsMap(); + Object meta = mappingSource.get("_meta"); + if (meta instanceof Map metaMap) { + Object policyNameMetaField = metaMap.get(ENRICH_POLICY_NAME_FIELD_NAME); + if (policyNameMetaField == null) { + throw new ElasticsearchException( + "Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field missing", + destinationIndexName, + policyName + ); + } else if (policyName.equals(policyNameMetaField) == false) { + throw new ElasticsearchException( + "Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field does not " + + "match expected value of [{}], was [{}]", + destinationIndexName, + policyName, + policyName, + policyNameMetaField.toString() + ); + } + } else { + throw new ElasticsearchException( + "Could not verify enrich index [{}] metadata before completing [{}] policy run: mapping meta field missing", + destinationIndexName, + policyName + ); + } } private void updateEnrichPolicyAlias(final String destinationIndexName) { @@ -632,6 +681,7 @@ private void updateEnrichPolicyAlias(final String destinationIndexName) { logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase); GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase); ClusterState clusterState = clusterService.state(); + validateIndexBeforePromotion(destinationIndexName, clusterState); String[] concreteIndices = indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(clusterState, aliasRequest); String[] aliases = aliasRequest.aliases(); IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest(); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index 9e62ed225ccf0..2e247477ecde5 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -17,6 +17,8 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction; import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -2253,6 +2255,79 @@ public void testEnrichNestedField() throws Exception { """); } + public void testRunnerValidatesIndexIntegrity() throws Exception { + final String sourceIndex = "source-index"; + IndexResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source(""" + { + "field1": "value1", + "field2": 2, + "field3": "ignored", + "field4": "ignored", + "field5": "value5" + }""", XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + + List enrichFields = List.of("field2", "field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + String createdEnrichIndex = ".enrich-test1-" + createTime; + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + + // Wrap the client so that when we receive the reindex action, we delete the index then resume operation. This mimics an invalid + // state for the resulting index. + Client client = new FilterClient(client()) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (action.equals(EnrichReindexAction.INSTANCE)) { + super.doExecute( + DeleteIndexAction.INSTANCE, + new DeleteIndexRequest(createdEnrichIndex), + listener.delegateFailureAndWrap((delegate, response) -> { + if (response.isAcknowledged() == false) { + fail("Enrich index should have been deleted but was not"); + } + super.doExecute(action, request, delegate); + }) + ); + } else { + super.doExecute(action, request, listener); + } + } + }; + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(client, policyName, policy, listener, createdEnrichIndex); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + Exception runnerException = exception.get(); + if (runnerException == null) { + fail("Expected the runner to fail when the underlying index was deleted during policy execution!"); + } + assertThat(runnerException, is(instanceOf(ElasticsearchException.class))); + assertThat(runnerException.getMessage(), containsString("Could not verify enrich index")); + assertThat(runnerException.getMessage(), containsString("mapping meta field missing")); + } + private EnrichPolicyRunner createPolicyRunner( String policyName, EnrichPolicy policy,