-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rule解析错误? #25
Comments
不好意思,没看仔细有2个Function, |
我看代码里规则是mock的? |
这个flink的项目解决的问题是:一种数据用一种drools规则去处理。 |
2022-06-02 09:42:17.599 [Legacy Source Thread - Source: Rule Source -> Rule Source Deserialization -> Timestamps/Watermarks (1/1)] WARN com.alarm.eagle.functions.RuleDeserializer:31 - Failed parsing rule, dropping it:
java.io.IOException: Invalid rule (wrong number of tokens): [{
"id": 123,
"name": "eagle_log_rule_1",
"appId": "059b0847-4fda-487a-ab85-7a5e625a8bd1",
"type": "logrules",
"script": "package logrules\n\nimport com.alarm.eagle.util.DateUtil;\nimport com.alarm.eagle.log.LogEntry;\nimport org.slf4j.Logger;\nimport com.alarm.eagle.util.Md5Util;\nimport com.alarm.eagle.util.RegexUtil\nimport java.util.Date;\n\nglobal Logger LOG;\n\nrule "eagle_log_rule_1"\nno-loop true\nsalience 100\nwhen\n $log : LogEntry( index == "eagle_log_1", $msg : message)\nthen\n LOG.debug("receive eagle_log_1 log, id:[{}]", $log.getId());\n String type = $log.getType();\n if ("opm".equals(type)){\n String logTime = RegexUtil.extractString("(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3})", $msg);\n if(logTime == null){\n LOG.warn("invalid date or time, log: {}", $msg);\n return;\n }\n Date date = DateUtil.convertFromString("yyyy-MM-dd HH:mm:ss.SSS", logTime);\n $log.setTimestamp(date != null ? date : $log.getAtTimestamp());\n if ($msg.contains("EventTracking")){\n String tracking = RegexUtil.extractString("(EventTracking.+)", $msg);\n if (tracking != null){\n String[] tracks = tracking.split("\\|");\n $log.addField("EventType",tracks[0]);\n $log.addField("LogType",tracks[1]);\n $log.addField("LogId",tracks[2]);\n $log.addField("UserId",tracks[3]);\n $log.addField("LogTime",tracks[4]);\n }\n }\n } else if ("offermanager".equals(type)){\n String logTime = RegexUtil.extractString("(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3})", $msg);\n if(logTime == null){\n LOG.warn("invalid date or time, log: {}", $msg);\n return;\n }\n Date date = DateUtil.convertFromString("yyyy-MM-dd HH:mm:ss,SSS", logTime);\n $log.setTimestamp(date != null ? date : $log.getAtTimestamp());\n } else {\n return;\n }\n\n long delayTime = (System.currentTimeMillis() - $log.getTimestamp().getTime())/1000;\n if (delayTime > 5243600 || delayTime < -5243600) {\n LOG.warn("Too early or too late log, ignore it, delay:{}, log:{}", delayTime, $log.getTimestamp().getTime());\n return;\n }\n $log.dealDone();\n LOG.debug("out -----eagle_log_1------");\nend",
"version": "20190729",
"state": 1,
"updateTime": 1564475611452
}, {
"id": 456,
"name": "eagle_log_rule_2",
"appId": "2a2df323-d2ea-45ca-bf7e-6d2afa125688",
"type": "logrules",
"script": "package logrules\n\nimport org.slf4j.Logger;\nimport java.util.Date;\nimport java.util.Locale;\nimport com.alarm.eagle.log.LogEntry;\nimport com.alarm.eagle.util.RegexUtil;\nimport com.alarm.eagle.util.DateUtil\nimport java.time.LocalDateTime;\n\nglobal Logger LOG;\n\nrule "eagle_log_rule_2"\n\tno-loop true\n\tsalience 100\n when\n $log : LogEntry(index == "eagle_log_2", $msg : message)\n then\n LOG.debug("receive eagle_log_2 log, id:[{}]", $log.getId());\n String logTime = RegexUtil.extractString("(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3})", $msg);\n if(logTime == null){\n LOG.warn("invalid date or time, log: {}", $msg);\n return;\n }\n Date date = DateUtil.convertFromString("yyyy-MM-dd HH:mm:ss,SSS", logTime);\n $log.setTimestamp(date!=null? date:$log.getAtTimestamp());\n\n $log.dealDone();\n LOG.debug("out ----- eagle_log_2 log-----");\nend",
"version": "20190826",
"state": 1,
"updateTime": 1567078969483
}]
at com.alarm.eagle.rule.RuleParser.parsePlain(RuleParser.java:35) ~[classes/:?]
at com.alarm.eagle.rule.RuleParser.fromString(RuleParser.java:24) ~[classes/:?]
at com.alarm.eagle.functions.RuleDeserializer.flatMap(RuleDeserializer.java:25) [classes/:?]
at com.alarm.eagle.functions.RuleDeserializer.flatMap(RuleDeserializer.java:10) [classes/:?]
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
at com.alarm.eagle.source.RuleSourceFunction.run(RuleSourceFunction.java:29) [classes/:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-streaming-java_2.11-1.11.0.jar:1.11.0]
请问规则的格式是drl格式还是
{
"ruleId": 1,
"ruleState": "ACTIVE",
"groupingKeyNames": [
"beneficiaryId",
"payerId"
],
"aggregateFieldName": "paymentAmount",
"aggregatorFunctionType": "SUM",
"limitOperatorType": "GREATER",
"limit": 1000000,
"windowMinutes": 1440
}
我看你代码里面是按,split的,你示例格式里面也少了unique?
The text was updated successfully, but these errors were encountered: