diff --git a/flume-core/src/main/java/com/cloudera/flume/handlers/endtoend/ValueDecorator.java b/flume-core/src/main/java/com/cloudera/flume/handlers/endtoend/ValueDecorator.java index 95d32492..70c83d3b 100644 --- a/flume-core/src/main/java/com/cloudera/flume/handlers/endtoend/ValueDecorator.java +++ b/flume-core/src/main/java/com/cloudera/flume/handlers/endtoend/ValueDecorator.java @@ -28,20 +28,33 @@ /** * This decorator adds values to events that pass through it. + * By default decorator does not escape value, use "escape=true" to escape it. */ public class ValueDecorator extends EventSinkDecorator { - + public static final String KW_ESCAPE = "escape"; final String attr; // attribute to tag + + // We store the value in two forms for optimization purposes + final String unescapedValue; final byte[] value; - public ValueDecorator(S s, String attr, byte[] value) { + final boolean escape; + + public ValueDecorator(S s, String attr, String value) { + this(s, attr, value, false); + } + + public ValueDecorator(S s, String attr, String value, boolean escape) { super(s); this.attr = attr; - this.value = value.clone(); + this.escape = escape; + this.unescapedValue = value; + this.value = value.getBytes().clone(); } public void append(Event e) throws IOException, InterruptedException { - e.set(attr, value); + byte[] attrVal = escape ? e.escapeString(unescapedValue).getBytes() : value; + e.set(attr, attrVal); super.append(e); } @@ -51,10 +64,12 @@ public static SinkDecoBuilder builder() { public EventSinkDecorator build(Context context, String... argv) { Preconditions.checkArgument(argv.length == 2, - "usage: value(\"attr\", \"value\")"); + "usage: value(\"attr\", \"value\"{, " + KW_ESCAPE + "=true|false})"); String attr = argv[0]; String v = argv[1]; - return new ValueDecorator(null, attr, v.getBytes()); + String escapedArg = context.getValue(KW_ESCAPE); + boolean escape = (escapedArg == null) ? false : Boolean.parseBoolean(escapedArg); + return new ValueDecorator(null, attr, v, escape); } }; diff --git a/flume-core/src/test/java/com/cloudera/flume/handlers/endtoend/TestValueDecorator.java b/flume-core/src/test/java/com/cloudera/flume/handlers/endtoend/TestValueDecorator.java new file mode 100644 index 00000000..be1c498f --- /dev/null +++ b/flume-core/src/test/java/com/cloudera/flume/handlers/endtoend/TestValueDecorator.java @@ -0,0 +1,63 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.flume.handlers.endtoend; + +import java.io.IOException; + +import com.cloudera.flume.core.Event; +import com.cloudera.flume.core.EventImpl; +import com.cloudera.flume.handlers.debug.ConsoleEventSink; +import org.junit.Assert; +import org.junit.Test; + +/** + * Some tests to verify value decorator behaviour + */ +public class TestValueDecorator { + @Test + public void testValueDecorator() throws IOException, InterruptedException { + String value = "value%{nanos}"; + ValueDecorator dec = new ValueDecorator(new ConsoleEventSink(), "attr", value, false); + dec.open(); + Event e = new EventImpl("body".getBytes(), 1234567L, Event.Priority.INFO, 87654321L, "host"); + + dec.append(e); + + Assert.assertNotNull("Attribute wasn't added", e.get("attr")); + // Attribute value should NOT be escaped + Assert.assertArrayEquals("Attribute value is incorrect", value.getBytes(), e.get("attr")); + + dec.close(); + } + + @Test + public void testValueDecoratorWithEscape() throws IOException, InterruptedException { + String value = "value-%{nanos}-%{body}"; + ValueDecorator dec = new ValueDecorator(new ConsoleEventSink(), "attr", value, true); + dec.open(); + Event e = new EventImpl("bodyString".getBytes(), 1234567L, Event.Priority.INFO, 87654321L, "host"); + + dec.append(e); + + Assert.assertNotNull("Attribute wasn't added", e.get("attr")); + Assert.assertEquals("Attribute value is incorrect", "value-87654321-bodyString", new String(e.get("attr"))); + + dec.close(); + } +} + diff --git a/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java b/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java index 13f164bf..0093020f 100644 --- a/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java +++ b/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java @@ -54,11 +54,9 @@ public void testTagConflict() { @Test public void testMaskNoConflict() throws IOException, InterruptedException { MemorySinkSource mem = new MemorySinkSource(); - EventSink s1 = new ValueDecorator(mem, "duped", - "second".getBytes()); + EventSink s1 = new ValueDecorator(mem, "duped", "second"); EventSink s2 = new MaskDecorator(s1, "duped"); - EventSink snk = new ValueDecorator(s2, "duped", - "first".getBytes()); + EventSink snk = new ValueDecorator(s2, "duped", "first"); snk.open(); Event e = new EventImpl("foo".getBytes()); diff --git a/flume-docs/src/docs/UserGuide/Appendix b/flume-docs/src/docs/UserGuide/Appendix index c8a89186..83beb7af 100644 --- a/flume-docs/src/docs/UserGuide/Appendix +++ b/flume-docs/src/docs/UserGuide/Appendix @@ -387,9 +387,12 @@ If this second attempt fails, it throws an exception. This is useful in conjunction with network sinks where connections can be broken. The open/ close retry attempt is often sufficient to re-establish the connection. -+value("_attr_","_value_")+ :: The value decorator adds a new metadata ++value("_attr_","_value_"{,escape=true|false})+ :: The value decorator adds a new metadata attribute _attr_ with the value _value_. Agents can mark their data with -specific tags for later demultiplexing. +specific tags for later demultiplexing. By default a value the user entered +will be attached. By setting escape=true, the value will be interpreted and +attempt to replace escape sequences with values from the event's attribute +list. +mask("_attr1_"[,"_attr2_", ...])+ :: The mask decorator outputs inputted events that are modified so that all metadata *except* the attributes