diff --git a/docs/reference/ingest/processors/pipeline.asciidoc b/docs/reference/ingest/processors/pipeline.asciidoc index 7f1ea2885e69a..8a8b0310142d8 100644 --- a/docs/reference/ingest/processors/pipeline.asciidoc +++ b/docs/reference/ingest/processors/pipeline.asciidoc @@ -21,6 +21,8 @@ include::common-options.asciidoc[] -------------------------------------------------- // NOTCONSOLE +The name of the current pipeline can be accessed from the `_ingest.pipeline` ingest metadata key. + An example of using this processor for nesting pipelines would be: Define an inner pipeline: diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index 76dbc180fa0e5..216568f483e8b 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -202,3 +202,81 @@ teardown: } - match: { error.root_cause.0.type: "ingest_processor_exception" } - match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" } + +--- +"Test _ingest.pipeline metadata": + - do: + ingest.put_pipeline: + id: "pipeline1" + body: > + { + "processors" : [ + { + "append" : { + "field": "pipelines", + "value": "{{_ingest.pipeline}}" + } + }, + { + "pipeline" : { + "name": "another_pipeline" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.put_pipeline: + id: "another_pipeline" + body: > + { + "processors" : [ + { + "append" : { + "field": "pipelines", + "value": "{{_ingest.pipeline}}" + } + }, + { + "pipeline" : { + "name": "another_pipeline2" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.put_pipeline: + id: "another_pipeline2" + body: > + { + "processors" : [ + { + "append" : { + "field": "pipelines", + "value": "{{_ingest.pipeline}}" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "pipeline1" + body: > + { + } + + - do: + get: + index: test + id: 1 + - length: { _source.pipelines: 3 } + - match: { _source.pipelines.0: "pipeline1" } + - match: { _source.pipelines.1: "another_pipeline" } + - match: { _source.pipelines.2: "another_pipeline2" } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 456a2ba15dd4c..6203326e1c2f9 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -284,26 +284,30 @@ teardown: - length: { docs.0.processor_results.0.doc._source: 2 } - match: { docs.0.processor_results.0.doc._source.foo.bar.0.item: "HELLO" } - match: { docs.0.processor_results.0.doc._source.field2.value: "_value" } - - length: { docs.0.processor_results.0.doc._ingest: 1 } + - length: { docs.0.processor_results.0.doc._ingest: 2 } - is_true: docs.0.processor_results.0.doc._ingest.timestamp + - is_true: docs.0.processor_results.0.doc._ingest.pipeline - length: { docs.0.processor_results.1.doc._source: 3 } - match: { docs.0.processor_results.1.doc._source.foo.bar.0.item: "HELLO" } - match: { docs.0.processor_results.1.doc._source.field2.value: "_value" } - match: { docs.0.processor_results.1.doc._source.field3: "third_val" } - - length: { docs.0.processor_results.1.doc._ingest: 1 } + - length: { docs.0.processor_results.1.doc._ingest: 2 } - is_true: docs.0.processor_results.1.doc._ingest.timestamp + - is_true: docs.0.processor_results.1.doc._ingest.pipeline - length: { docs.0.processor_results.2.doc._source: 3 } - match: { docs.0.processor_results.2.doc._source.foo.bar.0.item: "HELLO" } - match: { docs.0.processor_results.2.doc._source.field2.value: "_VALUE" } - match: { docs.0.processor_results.2.doc._source.field3: "third_val" } - - length: { docs.0.processor_results.2.doc._ingest: 1 } + - length: { docs.0.processor_results.2.doc._ingest: 2 } - is_true: docs.0.processor_results.2.doc._ingest.timestamp + - is_true: docs.0.processor_results.2.doc._ingest.pipeline - length: { docs.0.processor_results.3.doc._source: 3 } - match: { docs.0.processor_results.3.doc._source.foo.bar.0.item: "hello" } - match: { docs.0.processor_results.3.doc._source.field2.value: "_VALUE" } - match: { docs.0.processor_results.3.doc._source.field3: "third_val" } - - length: { docs.0.processor_results.3.doc._ingest: 1 } + - length: { docs.0.processor_results.3.doc._ingest: 2 } - is_true: docs.0.processor_results.3.doc._ingest.timestamp + - is_true: docs.0.processor_results.3.doc._ingest.pipeline --- "Test simulate with exception thrown": @@ -393,12 +397,14 @@ teardown: - match: { docs.1.processor_results.0.doc._index: "index" } - match: { docs.1.processor_results.0.doc._source.foo: 5 } - match: { docs.1.processor_results.0.doc._source.bar: "hello" } - - length: { docs.1.processor_results.0.doc._ingest: 1 } + - length: { docs.1.processor_results.0.doc._ingest: 2 } - is_true: docs.1.processor_results.0.doc._ingest.timestamp + - is_true: docs.1.processor_results.0.doc._ingest.pipeline - match: { docs.1.processor_results.1.doc._source.foo: 5 } - match: { docs.1.processor_results.1.doc._source.bar: "HELLO" } - - length: { docs.1.processor_results.1.doc._ingest: 1 } + - length: { docs.1.processor_results.1.doc._ingest: 2 } - is_true: docs.1.processor_results.1.doc._ingest.timestamp + - is_true: docs.1.processor_results.1.doc._ingest.pipeline --- "Test verbose simulate with on_failure": diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 1f0d2b4757128..72adbfae0e6c0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -646,8 +646,14 @@ private static Object deepCopy(Object value) { */ public void executePipeline(Pipeline pipeline, BiConsumer handler) { if (executedPipelines.add(pipeline.getId())) { + Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId()); pipeline.execute(this, (result, e) -> { executedPipelines.remove(pipeline.getId()); + if (previousPipeline != null) { + ingestMetadata.put("pipeline", previousPipeline); + } else { + ingestMetadata.remove("pipeline"); + } handler.accept(result, e); }); } else { diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index be02fe24752c1..2ee99d6e5d48f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -31,7 +31,7 @@ public class PipelineProcessor extends AbstractProcessor { private final TemplateScript.Factory pipelineTemplate; private final IngestService ingestService; - private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) { + PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) { super(tag); this.pipelineTemplate = pipelineTemplate; this.ingestService = ingestService; diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index aebcc28e77d5e..74dba097db792 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -20,17 +20,22 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.TemplateScript; import org.elasticsearch.test.ESTestCase; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -205,6 +210,58 @@ pipeline3Id, null, null, new CompoundProcessor( assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L)); } + public void testIngestPipelineMetadata() { + IngestService ingestService = createIngestService(); + + final int numPipelines = 16; + Pipeline firstPipeline = null; + for (int i = 0; i < numPipelines; i++) { + String pipelineId = Integer.toString(i); + List processors = new ArrayList<>(); + processors.add(new AbstractProcessor(null) { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.appendFieldValue("pipelines", ingestDocument.getIngestMetadata().get("pipeline")); + return ingestDocument; + } + + @Override + public String getType() { + return null; + } + + }); + if (i < (numPipelines - 1)) { + TemplateScript.Factory pipelineName = new TestTemplateService.MockTemplateScript.Factory(Integer.toString(i + 1)); + processors.add(new PipelineProcessor(null, pipelineName, ingestService)); + } + + + Pipeline pipeline = new Pipeline(pipelineId, null, null, new CompoundProcessor(false, processors, List.of())); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + if (firstPipeline == null) { + firstPipeline = pipeline; + } + } + + IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + IngestDocument[] docHolder = new IngestDocument[1]; + Exception[] errorHolder = new Exception[1]; + testIngestDocument.executePipeline(firstPipeline, (doc, e) -> { + docHolder[0] = doc; + errorHolder[0] = e; + }); + assertThat(docHolder[0], notNullValue()); + assertThat(errorHolder[0], nullValue()); + + IngestDocument ingestDocument = docHolder[0]; + List pipelines = ingestDocument.getFieldValue("pipelines", List.class); + assertThat(pipelines.size(), equalTo(numPipelines)); + for (int i = 0; i < numPipelines; i++) { + assertThat(pipelines.get(i), equalTo(Integer.toString(i))); + } + } + static IngestService createIngestService() { IngestService ingestService = mock(IngestService.class); ScriptService scriptService = mock(ScriptService.class);