Skip to content

Commit

Permalink
FLUME-676: Allow EL values in value() decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
alexteaa authored and Jonathan Hsieh committed Jun 27, 2011
1 parent 6f719ae commit 89210af
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 12 deletions.
Expand Up @@ -28,20 +28,33 @@

/**
* This decorator adds values to events that pass through it.
* By default decorator does not escape value, use "escape=true" to escape it.
*/
public class ValueDecorator<S extends EventSink> extends EventSinkDecorator<S> {

public static final String KW_ESCAPE = "escape";
final String attr; // attribute to tag

// We store the value in two forms for optimization purposes
final String unescapedValue;
final byte[] value;

public ValueDecorator(S s, String attr, byte[] value) {
final boolean escape;

public ValueDecorator(S s, String attr, String value) {
this(s, attr, value, false);
}

public ValueDecorator(S s, String attr, String value, boolean escape) {
super(s);
this.attr = attr;
this.value = value.clone();
this.escape = escape;
this.unescapedValue = value;
this.value = value.getBytes().clone();
}

public void append(Event e) throws IOException, InterruptedException {
e.set(attr, value);
byte[] attrVal = escape ? e.escapeString(unescapedValue).getBytes() : value;
e.set(attr, attrVal);
super.append(e);
}

Expand All @@ -51,10 +64,12 @@ public static SinkDecoBuilder builder() {
public EventSinkDecorator<EventSink> build(Context context,
String... argv) {
Preconditions.checkArgument(argv.length == 2,
"usage: value(\"attr\", \"value\")");
"usage: value(\"attr\", \"value\"{, " + KW_ESCAPE + "=true|false})");
String attr = argv[0];
String v = argv[1];
return new ValueDecorator<EventSink>(null, attr, v.getBytes());
String escapedArg = context.getValue(KW_ESCAPE);
boolean escape = (escapedArg == null) ? false : Boolean.parseBoolean(escapedArg);
return new ValueDecorator<EventSink>(null, attr, v, escape);
}
};

Expand Down
@@ -0,0 +1,63 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.flume.handlers.endtoend;

import java.io.IOException;

import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.handlers.debug.ConsoleEventSink;
import org.junit.Assert;
import org.junit.Test;

/**
* Some tests to verify value decorator behaviour
*/
public class TestValueDecorator {
@Test
public void testValueDecorator() throws IOException, InterruptedException {
String value = "value%{nanos}";
ValueDecorator dec = new ValueDecorator(new ConsoleEventSink(), "attr", value, false);
dec.open();
Event e = new EventImpl("body".getBytes(), 1234567L, Event.Priority.INFO, 87654321L, "host");

dec.append(e);

Assert.assertNotNull("Attribute wasn't added", e.get("attr"));
// Attribute value should NOT be escaped
Assert.assertArrayEquals("Attribute value is incorrect", value.getBytes(), e.get("attr"));

dec.close();
}

@Test
public void testValueDecoratorWithEscape() throws IOException, InterruptedException {
String value = "value-%{nanos}-%{body}";
ValueDecorator dec = new ValueDecorator(new ConsoleEventSink(), "attr", value, true);
dec.open();
Event e = new EventImpl("bodyString".getBytes(), 1234567L, Event.Priority.INFO, 87654321L, "host");

dec.append(e);

Assert.assertNotNull("Attribute wasn't added", e.get("attr"));
Assert.assertEquals("Attribute value is incorrect", "value-87654321-bodyString", new String(e.get("attr")));

dec.close();
}
}

Expand Up @@ -54,11 +54,9 @@ public void testTagConflict() {
@Test
public void testMaskNoConflict() throws IOException, InterruptedException {
MemorySinkSource mem = new MemorySinkSource();
EventSink s1 = new ValueDecorator<EventSink>(mem, "duped",
"second".getBytes());
EventSink s1 = new ValueDecorator<EventSink>(mem, "duped", "second");
EventSink s2 = new MaskDecorator<EventSink>(s1, "duped");
EventSink snk = new ValueDecorator<EventSink>(s2, "duped",
"first".getBytes());
EventSink snk = new ValueDecorator<EventSink>(s2, "duped", "first");
snk.open();

Event e = new EventImpl("foo".getBytes());
Expand Down
7 changes: 5 additions & 2 deletions flume-docs/src/docs/UserGuide/Appendix
Expand Up @@ -387,9 +387,12 @@ If this second attempt fails, it throws an exception. This is useful in
conjunction with network sinks where connections can be broken. The open/
close retry attempt is often sufficient to re-establish the connection.

+value("_attr_","_value_")+ :: The value decorator adds a new metadata
+value("_attr_","_value_"{,escape=true|false})+ :: The value decorator adds a new metadata
attribute _attr_ with the value _value_. Agents can mark their data with
specific tags for later demultiplexing.
specific tags for later demultiplexing. By default a value the user entered
will be attached. By setting escape=true, the value will be interpreted and
attempt to replace escape sequences with values from the event's attribute
list.

+mask("_attr1_"[,"_attr2_", ...])+ :: The mask decorator outputs inputted
events that are modified so that all metadata *except* the attributes
Expand Down

0 comments on commit 89210af

Please sign in to comment.