Skip to content

Commit

Permalink
refactor some Java in the DataFilter class
Browse files Browse the repository at this point in the history
Fixes #74
  • Loading branch information
Guy Boertje authored and jordansissel committed Nov 15, 2016
1 parent 2cdd589 commit 7543d8f
Show file tree
Hide file tree
Showing 16 changed files with 402 additions and 85 deletions.
10 changes: 10 additions & 0 deletions .gitignore
Expand Up @@ -2,3 +2,13 @@
Gemfile.lock
.bundle
vendor

# build dirs
build
.gradle

# Intellij
.idea
*.iml

gradle.properties
4 changes: 3 additions & 1 deletion build.gradle
Expand Up @@ -46,11 +46,13 @@ dependencies {
compileOnly group: "org.apache.logging.log4j", name: "log4j-api", version: "2.6.2"
compileOnly group: "org.apache.logging.log4j", name: "log4j-core", version: "2.6.2"
compileOnly group: "joda-time", name: "joda-time", version: "2.9.4"
testCompile group: 'junit', name: 'junit', version: '4.12'

testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: "org.apache.logging.log4j", name: "log4j-api", version: "2.6.2"
testCompile group: "org.apache.logging.log4j", name: "log4j-core", version: "2.6.2"
testCompile group: "joda-time", name: "joda-time", version: "2.9.4"
testCompile group: 'org.jruby', name: 'jruby-complete', version: "1.7.26"
testCompile fileTree(dir: logstashCoreEventGemPath, include: '**/*.jar')

compileOnly group: 'org.jruby', name: 'jruby-complete', version: "1.7.26"
compileOnly fileTree(dir: logstashCoreEventGemPath, include: '**/*.jar')
Expand Down
18 changes: 9 additions & 9 deletions lib/logstash/filters/date.rb
Expand Up @@ -170,21 +170,21 @@ def initialize(config = {})
end

source = @match.first
parsers = []
@match[1..-1].map do |format|
parsers << org.logstash.filters.parser.TimestampParserFactory.makeParser(format, @locale, @timezone)

@datefilter = org.logstash.filters.DateFilter.new(source, @target, @tag_on_failure) do |event|
filter_matched(event)
end

@match[1..-1].map do |format|
@datefilter.accept_filter_config(format, @locale, @timezone)

# Offer a fallback parser such that if the default system Locale is non-english and that no locale is set,
# we should try to parse english if the first local parsing fails.:w
if !@locale && "en" != java.util.Locale.getDefault().getLanguage() && (format.include?("MMM") || format.include?("E"))
parsers << org.logstash.filters.parser.TimestampParserFactory.makeParser(format, "en-US", @timezone)
@datefilter.accept_filter_config(format, "en-US", @timezone)
end
end
args = [source, parsers, @target, @tag_on_failure, @timezone]
#p :args => args
@datefilter = org.logstash.filters.DateFilter.new(*args) do |event|
filter_matched(event)
end

end # def initialize

def multi_filter(events)
Expand Down
123 changes: 54 additions & 69 deletions src/main/java/org/logstash/filters/DateFilter.java
Expand Up @@ -20,103 +20,72 @@

import org.joda.time.Instant;
import org.logstash.Event;
import org.logstash.Timestamp;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;
import org.logstash.filters.parser.CasualISO8601Parser;
import org.logstash.filters.parser.JodaParser;
import org.logstash.filters.parser.TimestampParser;
import org.logstash.filters.parser.TimestampParserFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class DateFilter {
private final boolean dynamicTimeZone;
private final String sourceField;
private final TimestampParser[] parsers;
private final String targetField;
private final String[] tagOnFailure;
private final RubySuccessHandler successHandler;
private String timeZone;
private RubySuccessHandler successHandler;
private final List<ParserExecutor> executors = new ArrayList<>();
private final ResultSetter setter;

public interface RubySuccessHandler {
void success(RubyEvent event);
}

public DateFilter(String sourceField, List<TimestampParser> parsers, String targetField, List<String> tagOnFailure, String timeZone, RubySuccessHandler successHandler) {
public DateFilter(String sourceField, String targetField, List<String> tagOnFailure, RubySuccessHandler successHandler) {
this.sourceField = sourceField;
this.parsers = parsers.toArray(new TimestampParser[0]);
this.targetField = targetField;
this.tagOnFailure = tagOnFailure.toArray(new String[0]);
this.timeZone = timeZone;
this.dynamicTimeZone = timeZone != null && timeZone.contains("%{");
this.successHandler = successHandler;
if (targetField.equals("@timestamp")) {
this.setter = new TimestampSetter();
} else {
this.setter = new FieldSetter(targetField);
}
}

public void register() {
// Nothing to do.
public DateFilter(String sourceField, String targetField, List<String> tagOnFailure) {
this.sourceField = sourceField;
this.tagOnFailure = tagOnFailure.toArray(new String[0]);
if (targetField.equals("@timestamp")) {
this.setter = new TimestampSetter();
} else {
this.setter = new FieldSetter(targetField);
}
}

public void acceptFilterConfig(String format, String locale, String timezone) {
TimestampParser parser = TimestampParserFactory.makeParser(format, locale, timezone);
if (parser instanceof JodaParser || parser instanceof CasualISO8601Parser) {
executors.add(new TextParserExecutor(parser, timezone));
} else {
executors.add(new NumericParserExecutor(parser));
}
}

// public void register() {
// // Nothing to do.
// }

//public Event[] receive(List<org.logstash.ext.JrubyEventExtLibrary.RubyEvent> rubyEvents) {

public List<RubyEvent> receive(List<RubyEvent> rubyEvents) {
for (RubyEvent rubyEvent : rubyEvents) {
Event event = rubyEvent.getEvent();
// XXX: Check for cast failures
//System.out.printf("Event: %s\n", event.toString());
//System.out.printf("Source: %s\n", sourceField);
Object input = event.getField(sourceField);
//System.out.printf("Parsing: %s\n", input);
if (input == null) {
continue;
}
boolean success = false;
for (TimestampParser parser : parsers) {
try {
//System.out.printf(" --> Trying %s\n", parser);
// XXX: I am not certain `input.toString()` is best, here. This allows non-string values
// to be parsed, such as Doubles, Longs, etc.
Instant instant;
if (parser instanceof JodaParser || parser instanceof CasualISO8601Parser) {
if (!(input instanceof String)) {
throw new IllegalArgumentException("Cannot parse date for value of type " + input.getClass().getName());
}

if (dynamicTimeZone) {
// event.sprintf here can throw IOException due to a field reference lookup failure.
//System.out.printf(" WithTimeZone: %s => %s", timeZone, event.sprintf(timeZone));
instant = parser.parseWithTimeZone(input.toString(), event.sprintf(timeZone));
} else {
instant = parser.parse((String) input);
}
} else {
if (input instanceof String) {
instant = parser.parse((String) input);
} else if (input instanceof Long) {
instant = parser.parse((Long) input);
} else if (input instanceof Double) {
instant = parser.parse((Double) input);
} else {
throw new IllegalArgumentException("Cannot parse date for value of type " + input.getClass().getName());
}
}

if (targetField.equals("@timestamp")) {
event.setTimestamp(new Timestamp(instant.getMillis()));
} else {
event.setField(targetField, new Timestamp(instant.getMillis()));
}

success = true;
break;
} catch (IllegalArgumentException|IOException e) {
// XXX: Store the last exception
//System.out.printf("Exception => %s\n", e);
}
}

if (success) {
ParseExecutionResult code = executeParsers(event);
if (ParseExecutionResult.NO_INPUT_FOUND == code) {
continue;
} else if (ParseExecutionResult.SUCCESS == code) {
if (successHandler != null) {
successHandler.success(rubyEvent);
}
Expand All @@ -126,8 +95,24 @@ public List<RubyEvent> receive(List<RubyEvent> rubyEvents) {
}
}
}

// multi_filter api in Logstash::Filters needs us to return the events.
return rubyEvents;
}

public ParseExecutionResult executeParsers(Event event) {
Object input = event.getField(sourceField);
//System.out.printf("Parsing: %s\n", input);
if (input == null) {
return ParseExecutionResult.NO_INPUT_FOUND;
}
for (ParserExecutor executor : executors) {
try {
Instant instant = executor.execute(input, event);
setter.set(event, instant);
return ParseExecutionResult.SUCCESS;
} catch (IllegalArgumentException | IOException e) {
// do nothing, try next ParserExecutor
}
}
return ParseExecutionResult.FAIL;
}
}
25 changes: 25 additions & 0 deletions src/main/java/org/logstash/filters/DynamicTzInputHandler.java
@@ -0,0 +1,25 @@
package org.logstash.filters;

import org.joda.time.Instant;
import org.logstash.Event;
import org.logstash.filters.parser.TimestampParser;

import java.io.IOException;

class DynamicTzInputHandler implements InputHandler {
private TimestampParser parser;
private String timeZone;

public DynamicTzInputHandler(TimestampParser parser, String timeZone) {
this.parser = parser;
this.timeZone = timeZone;
}

public DynamicTzInputHandler(TimestampParser parser) {
this.parser = parser;
}

public Instant handle(String input, Event event) throws IOException {
return this.parser.parseWithTimeZone(input, event.sprintf(timeZone));
}
}
17 changes: 17 additions & 0 deletions src/main/java/org/logstash/filters/FieldSetter.java
@@ -0,0 +1,17 @@
package org.logstash.filters;

import org.joda.time.Instant;
import org.logstash.Event;
import org.logstash.Timestamp;

class FieldSetter implements ResultSetter {
private String target;

FieldSetter(String target) {
this.target = target;
}

public void set(Event event, Instant instant) {
event.setField(this.target, new Timestamp(instant.getMillis()));
}
}
10 changes: 10 additions & 0 deletions src/main/java/org/logstash/filters/InputHandler.java
@@ -0,0 +1,10 @@
package org.logstash.filters;

import org.joda.time.Instant;
import org.logstash.Event;

import java.io.IOException;

interface InputHandler {
Instant handle(String input, Event event) throws IOException;
}
28 changes: 28 additions & 0 deletions src/main/java/org/logstash/filters/NumericParserExecutor.java
@@ -0,0 +1,28 @@
package org.logstash.filters;

import org.joda.time.Instant;
import org.logstash.Event;
import org.logstash.filters.parser.TimestampParser;

import java.io.IOException;

class NumericParserExecutor implements ParserExecutor {
private TimestampParser parser;
public NumericParserExecutor(TimestampParser parser) {
this.parser = parser;
}

public Instant execute(Object input, Event event) throws IOException {
if (input instanceof String) {
return parser.parse((String) input);
} else if (input instanceof Long) {
return parser.parse((Long)input);
} else if (input instanceof Integer) {
return parser.parse(((Integer) input).longValue());
} else if (input instanceof Double) {
return parser.parse((Double) input);
} else {
throw new IllegalArgumentException("Cannot parse date for value of type " + input.getClass().getName());
}
}
}
7 changes: 7 additions & 0 deletions src/main/java/org/logstash/filters/ParseExecutionResult.java
@@ -0,0 +1,7 @@
package org.logstash.filters;

public enum ParseExecutionResult {
SUCCESS,
FAIL,
NO_INPUT_FOUND
}
10 changes: 10 additions & 0 deletions src/main/java/org/logstash/filters/ParserExecutor.java
@@ -0,0 +1,10 @@
package org.logstash.filters;

import org.joda.time.Instant;
import org.logstash.Event;

import java.io.IOException;

interface ParserExecutor {
Instant execute(Object input, Event event) throws IOException;
}
8 changes: 8 additions & 0 deletions src/main/java/org/logstash/filters/ResultSetter.java
@@ -0,0 +1,8 @@
package org.logstash.filters;

import org.joda.time.Instant;
import org.logstash.Event;

interface ResultSetter {
void set(Event event, Instant instant);
}
19 changes: 19 additions & 0 deletions src/main/java/org/logstash/filters/StringInputHandler.java
@@ -0,0 +1,19 @@
package org.logstash.filters;

import org.joda.time.Instant;
import org.logstash.Event;
import org.logstash.filters.parser.TimestampParser;

import java.io.IOException;

class StringInputHandler implements InputHandler {
private TimestampParser parser;

public StringInputHandler(TimestampParser parser) {
this.parser = parser;
}

public Instant handle(String input, Event event) throws IOException {
return this.parser.parse(input);
}
}
30 changes: 30 additions & 0 deletions src/main/java/org/logstash/filters/TextParserExecutor.java
@@ -0,0 +1,30 @@
package org.logstash.filters;

import org.joda.time.Instant;
import org.logstash.Event;
import org.logstash.filters.parser.TimestampParser;

import java.io.IOException;

class TextParserExecutor implements ParserExecutor {
private InputHandler handler;

public TextParserExecutor(TimestampParser parser, String timeZone) {
if (timeZone != null && timeZone.contains("%{")) {
this.handler = new DynamicTzInputHandler(parser, timeZone);
} else {
this.handler = new StringInputHandler(parser);
}
}

public Instant execute(Object input, Event event) throws IOException {
if (!(input instanceof String)) {
throw new IllegalArgumentException("Cannot parse date for value of type " + input.getClass().getName());
}
return this.execute((String) input, event);
}

private Instant execute(String input, Event event) throws IOException {
return this.handler.handle(input, event);
}
}

0 comments on commit 7543d8f

Please sign in to comment.