Skip to content

Commit

Permalink
introduce KV Processor in Ingest Node
Browse files Browse the repository at this point in the history
Now you can parse field values of the `key=value` variety and have
`key` be inserted as a field name in an ingest document.

Closes elastic#22222.
  • Loading branch information
talevy committed Dec 20, 2016
1 parent f6b6e4e commit 6426182
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 9 deletions.
32 changes: 32 additions & 0 deletions docs/reference/ingest/ingest-node.asciidoc
Expand Up @@ -1485,6 +1485,38 @@ Converts a JSON string into a structured JSON object.
}
--------------------------------------------------

[[kv-processor]]
=== KV Processor
This processor helps automatically parse messages (or specific event fields) which are of the foo=bar variety.

For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED`, you can parse those automatically by configuring:


[source,js]
--------------------------------------------------
{
"kv": {
"field": "message",
"field_split": " ",
"value_split": "="
}
}
--------------------------------------------------

[[kv-options]]
.Kv Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to be parsed
| `field_split` | yes | - | Regex pattern to use for splitting key-value pairs
| `value_split` | yes | - | Regex pattern to use for splitting the key from the value within a key-value pair
| `target_field` | no | `null` | The field to insert the extracted keys into. Defaults to the root of the document
| `include_keys` | no | `null` | List of keys to filter and insert into document. Defaults to including all keys
| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
|======


[[lowercase-processor]]
=== Lowercase Processor
Converts a string to its lowercase equivalent.
Expand Down
Expand Up @@ -63,6 +63,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService));
processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory());
processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory());
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
return Collections.unmodifiableMap(processors);
}

Expand Down
@@ -0,0 +1,127 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys.
*/
public final class KeyValueProcessor extends AbstractProcessor {

public static final String TYPE = "kv";

private final String field;
private final String fieldSplit;
private final String valueSplit;
private final List<String> includeKeys;
private final String targetField;
private final boolean ignoreMissing;

KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, List<String> includeKeys,
String targetField, boolean ignoreMissing) {
super(tag);
this.field = field;
this.targetField = targetField;
this.fieldSplit = fieldSplit;
this.valueSplit = valueSplit;
this.includeKeys = includeKeys;
this.ignoreMissing = ignoreMissing;
}

String getField() {
return field;
}

String getFieldSplit() {
return fieldSplit;
}

String getValueSplit() {
return valueSplit;
}

List<String> getIncludeKeys() {
return includeKeys;
}

String getTargetField() {
return targetField;
}

boolean isIgnoreMissing() {
return ignoreMissing;
}

public void append(IngestDocument document, String targetField, String value) {
if (document.hasField(targetField)) {
document.appendFieldValue(targetField, value);
} else {
document.setFieldValue(targetField, value);
}
}

@Override
public void execute(IngestDocument document) {
String oldVal = document.getFieldValue(field, String.class, ignoreMissing);

if (oldVal == null && ignoreMissing) {
return;
} else if (oldVal == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
}

String fieldPathPrefix = (targetField == null) ? "" : targetField + ".";
Arrays.stream(oldVal.split(fieldSplit))
.map((f) -> f.split(valueSplit, 2))
.filter((p) -> includeKeys == null || includeKeys.contains(p[0]))
.forEach((p) -> append(document, fieldPathPrefix + p[0], p[1]));
}

@Override
public String getType() {
return TYPE;
}

public static class Factory implements Processor.Factory {
@Override
public KeyValueProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
List<String> includeKeys = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
if (includeKeys != null) {
includeKeys = Collections.unmodifiableList(includeKeys);
}
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, targetField, ignoreMissing);
}
}
}
@@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public class KeyValueProcessorFactoryTests extends ESTestCase {

public void testCreateWithDefaults() throws Exception {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
config.put("value_split", "=");
String processorTag = randomAsciiOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), is(nullValue()));
assertThat(processor.getTargetField(), is(nullValue()));
assertFalse(processor.isIgnoreMissing());
}

public void testCreateWithAllFieldsSet() throws Exception {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
config.put("value_split", "=");
config.put("target_field", "target");
config.put("include_keys", Arrays.asList("a", "b"));
config.put("ignore_missing", true);
String processorTag = randomAsciiOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), equalTo(Arrays.asList("a", "b")));
assertThat(processor.getTargetField(), equalTo("target"));
assertTrue(processor.isIgnoreMissing());
}

public void testCreateWithMissingField() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String processorTag = randomAsciiOfLength(10);
ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
() -> factory.create(null, processorTag, config));
assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
}

public void testCreateWithMissingFieldSplit() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAsciiOfLength(10);
ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
() -> factory.create(null, processorTag, config));
assertThat(exception.getMessage(), equalTo("[field_split] required property is missing"));
}

public void testCreateWithMissingValueSplit() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
String processorTag = randomAsciiOfLength(10);
ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
() -> factory.create(null, processorTag, config));
assertThat(exception.getMessage(), equalTo("[value_split] required property is missing"));
}
}
@@ -0,0 +1,96 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo;

public class KeyValueProcessorTests extends ESTestCase {

public void test() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "&", "=", null, "target", false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe")));
}

public void testRootTarget() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
ingestDocument.setFieldValue("myField", "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "myField", "&", "=", null, null, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("first", String.class), equalTo("hello"));
assertThat(ingestDocument.getFieldValue("second", List.class), equalTo(Arrays.asList("world", "universe")));
}

public void testKeySameAsSourceField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
ingestDocument.setFieldValue("first", "first=hello");
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "first", "&", "=", null, null, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("first", List.class), equalTo(Arrays.asList("first=hello", "hello")));
}

public void testIncludeKeys() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "&", "=",
Collections.singletonList("first"), "target", false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
assertFalse(ingestDocument.hasField("target.second"));
}

public void testMissingField() {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "unknown", "&", "=", null, "target", false);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
}

public void testNullValueWithIgnoreMissing() throws Exception {
String fieldName = RandomDocumentPicks.randomFieldName(random());
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
Collections.singletonMap(fieldName, null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "", "", null, "target", true);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}

public void testNonExistentWithIgnoreMissing() throws Exception {
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "unknown", "", "", null, "target", true);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}
}
Expand Up @@ -20,12 +20,13 @@
- match: { nodes.$master.ingest.processors.8.type: gsub }
- match: { nodes.$master.ingest.processors.9.type: join }
- match: { nodes.$master.ingest.processors.10.type: json }
- match: { nodes.$master.ingest.processors.11.type: lowercase }
- match: { nodes.$master.ingest.processors.12.type: remove }
- match: { nodes.$master.ingest.processors.13.type: rename }
- match: { nodes.$master.ingest.processors.14.type: script }
- match: { nodes.$master.ingest.processors.15.type: set }
- match: { nodes.$master.ingest.processors.16.type: sort }
- match: { nodes.$master.ingest.processors.17.type: split }
- match: { nodes.$master.ingest.processors.18.type: trim }
- match: { nodes.$master.ingest.processors.19.type: uppercase }
- match: { nodes.$master.ingest.processors.11.type: kv }
- match: { nodes.$master.ingest.processors.12.type: lowercase }
- match: { nodes.$master.ingest.processors.13.type: remove }
- match: { nodes.$master.ingest.processors.14.type: rename }
- match: { nodes.$master.ingest.processors.15.type: script }
- match: { nodes.$master.ingest.processors.16.type: set }
- match: { nodes.$master.ingest.processors.17.type: sort }
- match: { nodes.$master.ingest.processors.18.type: split }
- match: { nodes.$master.ingest.processors.19.type: trim }
- match: { nodes.$master.ingest.processors.20.type: uppercase }

0 comments on commit 6426182

Please sign in to comment.