Skip to content

Commit

Permalink
METRON-1681 Decouple the ParserBolt from the Parse execution logic (m…
Browse files Browse the repository at this point in the history
…errimanr) closes apache#1213
  • Loading branch information
merrimanr authored and justinleet committed Oct 25, 2018
1 parent 88fc032 commit cef3532
Show file tree
Hide file tree
Showing 20 changed files with 1,430 additions and 1,060 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import static org.apache.metron.rest.MetronRestConstants.GROK_CLASS_NAME;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.configuration.ConfigurationType;
Expand All @@ -35,18 +33,14 @@
import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.ParseMessageRequest;
import org.apache.metron.rest.service.GrokService;
import org.apache.metron.rest.service.SensorParserConfigService;
import org.apache.metron.rest.util.ParserIndex;
import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONObject;
import org.reflections.Reflections;
import org.reflections.util.ConfigurationBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand Down Expand Up @@ -141,61 +135,20 @@ public JSONObject parseMessage(ParseMessageRequest parseMessageRequest) throws R
} else if (sensorParserConfig.getParserClassName() == null) {
throw new RestException("SensorParserConfig must have a parserClassName");
} else {
MultilineMessageParser<JSONObject> parser;
Object parserObject;
MessageParser<JSONObject> parser;
try {
parserObject = Class.forName(sensorParserConfig.getParserClassName())
parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName())
.newInstance();
} catch (Exception e) {
throw new RestException(e.toString(), e.getCause());
}

if (!(parserObject instanceof MultilineMessageParser)) {
parser = new MultilineMessageParser<JSONObject>() {

@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, Object> config) {
((MessageParser<JSONObject>)parserObject).configure(config);
}

@Override
@SuppressWarnings("unchecked")
public void init() {
((MessageParser<JSONObject>)parserObject).init();
}

@Override
@SuppressWarnings("unchecked")
public boolean validate(JSONObject message) {
return ((MessageParser<JSONObject>)parserObject).validate(message);
}

@Override
@SuppressWarnings("unchecked")
public List<JSONObject> parse(byte[] message) {
return ((MessageParser<JSONObject>)parserObject).parse(message);
}

@Override
@SuppressWarnings("unchecked")
public Optional<List<JSONObject>> parseOptional(byte[] message) {
return ((MessageParser<JSONObject>)parserObject).parseOptional(message);
}
};
} else {
parser = (MultilineMessageParser<JSONObject>)parserObject;
}


Path temporaryGrokPath = null;
if (isGrokConfig(sensorParserConfig)) {
String name = parseMessageRequest.getSensorParserConfig().getSensorTopic();
temporaryGrokPath = grokService.saveTemporary(parseMessageRequest.getGrokStatement(), name);
sensorParserConfig.getParserConfig()
.put(MetronRestConstants.GROK_PATH_KEY, new Path(temporaryGrokPath, name).toString());
}

parser.configure(sensorParserConfig.getParserConfig());
parser.init();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.metron.parsers;

import org.apache.metron.common.error.MetronError;
import org.json.simple.JSONObject;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* Default implementation of ParserRunnerResults.
*/
public class DefaultParserRunnerResults implements ParserRunnerResults<JSONObject> {

private List<JSONObject> messages = new ArrayList<>();
private List<MetronError> errors = new ArrayList<>();

public List<JSONObject> getMessages() {
return messages;
}

public List<MetronError> getErrors() {
return errors;
}

public void addMessage(JSONObject message) {
this.messages.add(message);
}

public void addError(MetronError error) {
this.errors.add(error);
}

public void addErrors(List<MetronError> errors) {
this.errors.addAll(errors);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ParserRunnerResults parserResult = (ParserRunnerResults) o;
return Objects.equals(messages, parserResult.getMessages()) &&
Objects.equals(errors, parserResult.getErrors());
}

@Override
public int hashCode() {
int result = messages != null ? messages.hashCode() : 0;
result = 31 * result + (errors != null ? errors.hashCode() : 0);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.metron.common.Constants;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
import org.apache.metron.parsers.interfaces.MultilineMessageParser;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,7 +53,7 @@
import java.util.TimeZone;


public class GrokParser implements MultilineMessageParser<JSONObject>, Serializable {
public class GrokParser implements MessageParser<JSONObject>, Serializable {

protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.parsers;

import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.message.metadata.RawMessage;
import org.apache.metron.parsers.interfaces.MessageParserResult;
import org.apache.metron.stellar.dsl.Context;

import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

/**
* A ParserRunner is responsible for initializing MessageParsers and parsing messages with the appropriate MessageParser.
* The information needed to initialize a MessageParser is supplied by the parser config supplier. After the parsers
* are initialized, the execute method can then be called for each message and will return a ParserRunnerResults object
* that contains a list of parsed messages and/or a list of errors.
* @param <T> The type of a successfully parsed message.
*/
public interface ParserRunner<T> {

/**
* Return a list of all sensor types that can be parsed with this ParserRunner.
* @return Sensor types
*/
Set<String> getSensorTypes();

/**
*
* @param parserConfigSupplier Supplies parser configurations
* @param stellarContext Stellar context used to apply Stellar functions during field transformations
*/
void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext);

/**
* Parses a message and either returns the message or an error.
* @param sensorType Sensor type of the message
* @param rawMessage Raw message including metadata
* @param parserConfigurations Parser configurations
* @return ParserRunnerResults containing a list of messages and a list of errors
*/
ParserRunnerResults<T> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations);

}

0 comments on commit cef3532

Please sign in to comment.