Skip to content

Commit

Permalink
Update Siddhi and jpmml versions
Browse files Browse the repository at this point in the history
  • Loading branch information
pcnfernando committed Sep 17, 2019
1 parent 07f4150 commit af69e51
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 80 deletions.
3 changes: 2 additions & 1 deletion component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.wso2.extension.siddhi.gpl.execution.pmml</groupId>
<groupId>io.siddhi.gpl.execution.pmml</groupId>
<artifactId>siddhi-gpl-execution-pmml-parent</artifactId>
<version>5.0.0-SNAPSHOT</version>
<version>5.0.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.wso2.extension.siddhi.gpl.execution.pmml;
package io.siddhi.gpl.execution.pmml.pmml;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
Expand All @@ -40,6 +40,7 @@
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.gpl.execution.pmml.pmml.util.PMMLUtil;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
Expand All @@ -56,7 +57,6 @@
import org.jpmml.evaluator.OutputField;
import org.jpmml.evaluator.PMMLException;
import org.jpmml.evaluator.TargetField;
import org.wso2.extension.siddhi.gpl.execution.pmml.util.PMMLUtil;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -111,7 +111,7 @@
)
}
)
public class PmmlModelProcessor extends StreamProcessor<State> {
public class PmmlModelProcessor extends StreamProcessor<PmmlModelProcessor.ExtensionState> {

private static final Logger logger = Logger.getLogger(PmmlModelProcessor.class);

Expand All @@ -126,11 +126,11 @@ public class PmmlModelProcessor extends StreamProcessor<State> {
private Evaluator evaluator;

@Override
protected StateFactory<State> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition,
ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
StreamEventClonerHolder streamEventClonerHolder,
boolean outputExpectsExpiredEvents, boolean findToBeExecuted,
SiddhiQueryContext siddhiQueryContext) {
protected StateFactory<ExtensionState> init(MetaStreamEvent metaStreamEvent,
AbstractDefinition abstractDefinition,
ExpressionExecutor[] expressionExecutors,
ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder,
boolean b, boolean b1, SiddhiQueryContext siddhiQueryContext) {
if (attributeExpressionExecutors.length == 0) {
throw new SiddhiAppValidationException("PMML model definition not available.");
} else {
Expand Down Expand Up @@ -163,64 +163,65 @@ protected StateFactory<State> init(MetaStreamEvent metaStreamEvent, AbstractDefi
this.outputFields.put(outputField.getName(), outputField.getDataType());
}
}
return null;
return () -> new ExtensionState();
}

@Override
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor,
protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor,
StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater,
State state) {
StreamEvent event = streamEventChunk.getFirst();
Map<FieldName, FieldValue> inData = new HashMap<>();
ExtensionState extensionState) {
while (complexEventChunk.hasNext()) {
StreamEvent event = complexEventChunk.next();
Map<FieldName, FieldValue> inData = new HashMap<>();

for (Map.Entry<InputField, int[]> entry : attributeIndexMap.entrySet()) {
InputField inputField = entry.getKey();
int[] attributeIndexArray = entry.getValue();
Object dataValue = null;
switch (attributeIndexArray[2]) {
case 0:
dataValue = event.getBeforeWindowData()[attributeIndexArray[3]];
break;
case 2:
dataValue = event.getOutputData()[attributeIndexArray[3]];
break;
default:
break;
}
try {
inData.put(inputField.getName(), inputField.prepare(String.valueOf(dataValue)));
} catch (InvalidResultException e) {
logger.error(String.format("Incompatible value for field: %s. Prediction might be erroneous.",
inputField.getName()), e);
throw new SiddhiAppRuntimeException(String.format("Incompatible value for field: %s. Prediction might "
+ "be erroneous.", inputField.getName()), e);
for (Map.Entry<InputField, int[]> entry : attributeIndexMap.entrySet()) {
InputField inputField = entry.getKey();
int[] attributeIndexArray = entry.getValue();
Object dataValue = null;
switch (attributeIndexArray[2]) {
case 0:
dataValue = event.getBeforeWindowData()[attributeIndexArray[3]];
break;
case 2:
dataValue = event.getOutputData()[attributeIndexArray[3]];
break;
default:
break;
}
try {
inData.put(inputField.getName(), inputField.prepare(String.valueOf(dataValue)));
} catch (InvalidResultException e) {
logger.error(String.format("Incompatible value for field: %s. Prediction might be erroneous.",
inputField.getName()), e);
throw new SiddhiAppRuntimeException(String.format("Incompatible value for field: %s. " +
"Prediction might be erroneous.", inputField.getName()), e);
}
}
}

if (!inData.isEmpty()) {
try {
Map<FieldName, ?> result = evaluator.evaluate(inData);
Object[] output = new Object[outputFields.size()];
int i = 0;
for (FieldName fieldName : outputFields.keySet()) {
if (result.containsKey(fieldName)) {
Object value = result.get(fieldName);
output[i] = EvaluatorUtil.decode(value);
i++;
if (!inData.isEmpty()) {
try {
Map<FieldName, ?> result = evaluator.evaluate(inData);
Object[] output = new Object[outputFields.size()];
int i = 0;
for (FieldName fieldName : outputFields.keySet()) {
if (result.containsKey(fieldName)) {
Object value = result.get(fieldName);
output[i] = EvaluatorUtil.decode(value);
i++;
}
}
complexEventPopulater.populateComplexEvent(event, output);
nextProcessor.process(complexEventChunk);
} catch (PMMLException e) {
logger.error("Error while predicting. Invalid result occurred while evaluating the model",
e);
throw new SiddhiAppRuntimeException("Error while predicting", e);
}
complexEventPopulater.populateComplexEvent(event, output);
nextProcessor.process(streamEventChunk);
} catch (PMMLException e) {
logger.error("Error while predicting. Invalid result occurred while evaluating the model", e);
throw new SiddhiAppRuntimeException("Error while predicting", e);
}
}
}

@Override
public void start() {

try {
populateFeatureAttributeMapping();
} catch (Exception e) {
Expand All @@ -232,11 +233,8 @@ public void start() {

/**
* Match the attribute index values of stream with feature names of the model.
*
* @throws Exception ExceutionPlanCreationException
*/
private void populateFeatureAttributeMapping() throws Exception {

private void populateFeatureAttributeMapping() {
attributeIndexMap = new HashMap<>();
HashMap<String, InputField> features = new HashMap<>();

Expand Down Expand Up @@ -279,7 +277,6 @@ private void populateFeatureAttributeMapping() throws Exception {
* @return List Siddhi Output Attribute List
*/
private List<Attribute> generateOutputAttributes() {

List<Attribute> outputAttributes = new ArrayList<>();
for (Map.Entry<FieldName, org.dmg.pmml.DataType> entry : outputFields.entrySet()) {
FieldName fieldName = entry.getKey();
Expand All @@ -298,7 +295,6 @@ private List<Attribute> generateOutputAttributes() {
* @return Attribute.Type Mapped Siddhi Attribute
*/
private Attribute.Type mapOutputAttributes(org.dmg.pmml.DataType dataType) {

Attribute.Type type = null;
if (dataType.equals(org.dmg.pmml.DataType.DOUBLE)) {
type = Attribute.Type.DOUBLE;
Expand Down Expand Up @@ -329,4 +325,20 @@ public List<Attribute> getReturnAttributes() {
public ProcessingMode getProcessingMode() {
return ProcessingMode.BATCH;
}

static class ExtensionState extends State {
@Override
public boolean canDestroy() {
return false;
}

@Override
public Map<String, Object> snapshot() {
return null;
}

@Override
public void restore(Map<String, Object> map) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,24 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.wso2.extension.siddhi.gpl.execution.pmml.util;
package io.siddhi.gpl.execution.pmml.pmml.util;

import io.siddhi.core.exception.SiddhiAppCreationException;
import org.apache.log4j.Logger;
import org.dmg.pmml.PMML;
import org.jpmml.model.ImportFilter;
import org.jpmml.model.JAXBUtil;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.StringReader;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import javax.xml.bind.JAXBException;
import javax.xml.transform.Source;

/**
* Class implementing Pmml Model Processor.
*/
public class PMMLUtil {
private static final Logger logger = Logger.getLogger(PMMLUtil.class);

/**
* Unmarshal the definition and get an executable pmml model.
Expand All @@ -47,18 +43,16 @@ public static PMML unmarshal(String pmmlDefinition) {

try {
File pmmlFile = new File(pmmlDefinition);
InputSource pmmlSource;
Source source;
InputStream pmmlSource;
// if the given is a file path, read the pmml definition from the file
if (pmmlFile.isFile() && pmmlFile.canRead()) {
pmmlSource = new InputSource(new FileInputStream(pmmlFile));
pmmlSource = new FileInputStream(pmmlFile);
} else {
// else, read from the given definition
pmmlSource = new InputSource(new StringReader(pmmlDefinition));
pmmlSource = new ByteArrayInputStream(pmmlDefinition.getBytes("UTF8"));
}
source = ImportFilter.apply(pmmlSource);
return JAXBUtil.unmarshalPMML(source);
} catch (SAXException | JAXBException | FileNotFoundException e) {
return org.jpmml.model.PMMLUtil.unmarshal(pmmlSource);
} catch (SAXException | JAXBException | FileNotFoundException | UnsupportedEncodingException e) {
throw new SiddhiAppCreationException("Failed to unmarshal the pmml definition: "
+ pmmlDefinition + ". " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.wso2.extension.siddhi.gpl.execution.pmml;
package io.siddhi.gpl.execution.pmml.pmml;

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
Expand Down
2 changes: 1 addition & 1 deletion component/src/test/resources/testng.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<suite name="MonogStoreTestSuite" verbose="1">
<test name="MonogStoreTest">
<classes>
<class name="org.wso2.extension.siddhi.gpl.execution.pmml.PMMLModelProcessorTestCase" />
<class name="io.siddhi.gpl.execution.pmml.pmml.PMMLModelProcessorTestCase" />
</classes>
</test>
</suite>
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
<version>5</version>
</parent>

<groupId>org.wso2.extension.siddhi.gpl.execution.pmml</groupId>
<groupId>io.siddhi.gpl.execution.pmml</groupId>
<artifactId>siddhi-gpl-execution-pmml-parent</artifactId>
<version>5.0.0-SNAPSHOT</version>
<version>5.0.1-SNAPSHOT</version>
<packaging>pom</packaging>

Expand All @@ -46,8 +47,7 @@
</scm>

<properties>
<siddhi.version>5.0.0</siddhi.version>
<siddhi.version.range>[5.0.0,6.0.0)</siddhi.version.range>
<siddhi.version>5.1.4</siddhi.version>
<log4j.version>1.2.17.wso2v1</log4j.version>
<junit.version>4.12</junit.version>
<commons.logging.version>1.1.1</commons.logging.version>
Expand All @@ -56,7 +56,7 @@
<incremental.build.plugin.version>1.3</incremental.build.plugin.version>
<scr.plugin.version>1.24</scr.plugin.version>
<maven.project.plugin.version>2.9</maven.project.plugin.version>
<jpmml.evaluator.version>1.3.10</jpmml.evaluator.version>
<jpmml.evaluator.version>1.4.13</jpmml.evaluator.version>
<jacoco.plugin.version>0.7.9</jacoco.plugin.version>
</properties>

Expand Down

0 comments on commit af69e51

Please sign in to comment.