Skip to content

Commit

Permalink
Add pipeline name to ingest metadata
Browse files Browse the repository at this point in the history
This commit adds the name of the current pipeline to ingest metadata.
This pipeline name is accessible under the following key: '_ingest.pipeline'.

Example usage in pipeline:
PUT /_ingest/pipeline/2
{
    "processors": [
        {
            "set": {
                "field": "pipeline_name",
                "value": "{{_ingest.pipeline}}"
            }
        }
    ]
}

Closes elastic#42106
  • Loading branch information
martijnvg committed Dec 23, 2019
1 parent 7203cee commit 37d789f
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 7 deletions.
2 changes: 2 additions & 0 deletions docs/reference/ingest/processors/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ include::common-options.asciidoc[]
--------------------------------------------------
// NOTCONSOLE

The name of the current pipeline can be accessed from the `_ingest.pipeline` ingest metadata key.

An example of using this processor for nesting pipelines would be:

Define an inner pipeline:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,81 @@ teardown:
}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" }

---
"Test _ingest.pipeline metadata":
- do:
ingest.put_pipeline:
id: "pipeline1"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
},
{
"pipeline" : {
"name": "another_pipeline"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "another_pipeline"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
},
{
"pipeline" : {
"name": "another_pipeline2"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "another_pipeline2"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "pipeline1"
body: >
{
}
- do:
get:
index: test
id: 1
- length: { _source.pipelines: 3 }
- match: { _source.pipelines.0: "pipeline1" }
- match: { _source.pipelines.1: "another_pipeline" }
- match: { _source.pipelines.2: "another_pipeline2" }
Original file line number Diff line number Diff line change
Expand Up @@ -284,26 +284,30 @@ teardown:
- length: { docs.0.processor_results.0.doc._source: 2 }
- match: { docs.0.processor_results.0.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.0.doc._source.field2.value: "_value" }
- length: { docs.0.processor_results.0.doc._ingest: 1 }
- length: { docs.0.processor_results.0.doc._ingest: 2 }
- is_true: docs.0.processor_results.0.doc._ingest.timestamp
- is_true: docs.0.processor_results.0.doc._ingest.pipeline
- length: { docs.0.processor_results.1.doc._source: 3 }
- match: { docs.0.processor_results.1.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.1.doc._source.field2.value: "_value" }
- match: { docs.0.processor_results.1.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.1.doc._ingest: 1 }
- length: { docs.0.processor_results.1.doc._ingest: 2 }
- is_true: docs.0.processor_results.1.doc._ingest.timestamp
- is_true: docs.0.processor_results.1.doc._ingest.pipeline
- length: { docs.0.processor_results.2.doc._source: 3 }
- match: { docs.0.processor_results.2.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.2.doc._source.field2.value: "_VALUE" }
- match: { docs.0.processor_results.2.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.2.doc._ingest: 1 }
- length: { docs.0.processor_results.2.doc._ingest: 2 }
- is_true: docs.0.processor_results.2.doc._ingest.timestamp
- is_true: docs.0.processor_results.2.doc._ingest.pipeline
- length: { docs.0.processor_results.3.doc._source: 3 }
- match: { docs.0.processor_results.3.doc._source.foo.bar.0.item: "hello" }
- match: { docs.0.processor_results.3.doc._source.field2.value: "_VALUE" }
- match: { docs.0.processor_results.3.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.3.doc._ingest: 1 }
- length: { docs.0.processor_results.3.doc._ingest: 2 }
- is_true: docs.0.processor_results.3.doc._ingest.timestamp
- is_true: docs.0.processor_results.3.doc._ingest.pipeline

---
"Test simulate with exception thrown":
Expand Down Expand Up @@ -393,12 +397,14 @@ teardown:
- match: { docs.1.processor_results.0.doc._index: "index" }
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
- match: { docs.1.processor_results.0.doc._source.bar: "hello" }
- length: { docs.1.processor_results.0.doc._ingest: 1 }
- length: { docs.1.processor_results.0.doc._ingest: 2 }
- is_true: docs.1.processor_results.0.doc._ingest.timestamp
- is_true: docs.1.processor_results.0.doc._ingest.pipeline
- match: { docs.1.processor_results.1.doc._source.foo: 5 }
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
- length: { docs.1.processor_results.1.doc._ingest: 1 }
- length: { docs.1.processor_results.1.doc._ingest: 2 }
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
- is_true: docs.1.processor_results.1.doc._ingest.pipeline

---
"Test verbose simulate with on_failure":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,14 @@ private static Object deepCopy(Object value) {
*/
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline.getId());
if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
} else {
ingestMetadata.remove("pipeline");
}
handler.accept(result, e);
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class PipelineProcessor extends AbstractProcessor {
private final TemplateScript.Factory pipelineTemplate;
private final IngestService ingestService;

private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
super(tag);
this.pipelineTemplate = pipelineTemplate;
this.ingestService = ingestService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -205,6 +210,58 @@ pipeline3Id, null, null, new CompoundProcessor(
assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L));
}

public void testIngestPipelineMetadata() {
IngestService ingestService = createIngestService();

final int numPipelines = 16;
Pipeline firstPipeline = null;
for (int i = 0; i < numPipelines; i++) {
String pipelineId = Integer.toString(i);
List<Processor> processors = new ArrayList<>();
processors.add(new AbstractProcessor(null) {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.appendFieldValue("pipelines", ingestDocument.getIngestMetadata().get("pipeline"));
return ingestDocument;
}

@Override
public String getType() {
return null;
}

});
if (i < (numPipelines - 1)) {
TemplateScript.Factory pipelineName = new TestTemplateService.MockTemplateScript.Factory(Integer.toString(i + 1));
processors.add(new PipelineProcessor(null, pipelineName, ingestService));
}


Pipeline pipeline = new Pipeline(pipelineId, null, null, new CompoundProcessor(false, processors, List.of()));
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
if (firstPipeline == null) {
firstPipeline = pipeline;
}
}

IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
IngestDocument[] docHolder = new IngestDocument[1];
Exception[] errorHolder = new Exception[1];
testIngestDocument.executePipeline(firstPipeline, (doc, e) -> {
docHolder[0] = doc;
errorHolder[0] = e;
});
assertThat(docHolder[0], notNullValue());
assertThat(errorHolder[0], nullValue());

IngestDocument ingestDocument = docHolder[0];
List<?> pipelines = ingestDocument.getFieldValue("pipelines", List.class);
assertThat(pipelines.size(), equalTo(numPipelines));
for (int i = 0; i < numPipelines; i++) {
assertThat(pipelines.get(i), equalTo(Integer.toString(i)));
}
}

static IngestService createIngestService() {
IngestService ingestService = mock(IngestService.class);
ScriptService scriptService = mock(ScriptService.class);
Expand Down

0 comments on commit 37d789f

Please sign in to comment.